Verify MTU is valid for ipsec_site_connection

Since the check relies on the vpnservice subnet to determine whether
this is IPv4 ro IPv6, we must check in the plugin. Test is done at
create/update time and ensures that the MTU is equal to or greater
than the minimum allowed values, which are set to 68 for IPv4
minimum and 1280 for IPv6, respectively.

Refactored code to allow reuse of create and update test functions,
by allowing tests to override some settings, and to provide a dict
of changed items (for update).

bug 1219489

Change-Id: I3e62ef786d3a02c761903a15d546ee8758c0bf7f
This commit is contained in:
Paul Michali 2013-09-02 07:02:12 -04:00 committed by Mark McClain
parent 71bbefbcc6
commit 0f94119482
3 changed files with 176 additions and 224 deletions

View File

@ -17,6 +17,7 @@
# #
# @author: Swaminathan Vasudevan, Hewlett-Packard. # @author: Swaminathan Vasudevan, Hewlett-Packard.
import netaddr
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import orm from sqlalchemy import orm
from sqlalchemy.orm import exc from sqlalchemy.orm import exc
@ -37,6 +38,8 @@ from neutron.plugins.common import constants
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
IP_MIN_MTU = {4: 68, 6: 1280}
class IPsecPeerCidr(model_base.BASEV2): class IPsecPeerCidr(model_base.BASEV2):
"""Internal representation of a IPsec Peer Cidrs.""" """Internal representation of a IPsec Peer Cidrs."""
@ -241,6 +244,9 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
self._get_resource(context, self._get_resource(context,
IPsecPolicy, IPsecPolicy,
ipsec_sitecon['ipsecpolicy_id']) ipsec_sitecon['ipsecpolicy_id'])
self._check_mtu(context,
ipsec_sitecon['mtu'],
ipsec_sitecon['vpnservice_id'])
ipsec_site_conn_db = IPsecSiteConnection( ipsec_site_conn_db = IPsecSiteConnection(
id=uuidutils.generate_uuid(), id=uuidutils.generate_uuid(),
tenant_id=tenant_id, tenant_id=tenant_id,
@ -276,6 +282,13 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
raise vpnaas.IPsecSiteConnectionDpdIntervalValueError( raise vpnaas.IPsecSiteConnectionDpdIntervalValueError(
attr='dpd_timeout') attr='dpd_timeout')
def _check_mtu(self, context, mtu, vpnservice_id):
vpn_service_db = self._get_vpnservice(context, vpnservice_id)
subnet = vpn_service_db.subnet['cidr']
version = netaddr.IPNetwork(subnet).version
if mtu < IP_MIN_MTU[version]:
raise vpnaas.IPsecSiteConnectionMtuError(mtu=mtu, version=version)
def update_ipsec_site_connection( def update_ipsec_site_connection(
self, context, self, context,
ipsec_site_conn_id, ipsec_site_connection): ipsec_site_conn_id, ipsec_site_connection):
@ -296,6 +309,11 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
'timeout', ipsec_site_conn_db.dpd_timeout) 'timeout', ipsec_site_conn_db.dpd_timeout)
self._check_dpd(conn) self._check_dpd(conn)
if 'mtu' in conn:
self._check_mtu(context,
conn['mtu'],
ipsec_site_conn_db.vpnservice_id)
self.assert_update_allowed(ipsec_site_conn_db) self.assert_update_allowed(ipsec_site_conn_db)
if "peer_cidrs" in conn: if "peer_cidrs" in conn:

View File

@ -44,6 +44,11 @@ class IPsecSiteConnectionDpdIntervalValueError(qexception.InvalidInput):
"equal to or less than dpd_interval") "equal to or less than dpd_interval")
class IPsecSiteConnectionMtuError(qexception.InvalidInput):
message = _("ipsec_site_connection MTU %(mtu)d is too small "
"for ipv%(version)s")
class IKEPolicyNotFound(qexception.NotFound): class IKEPolicyNotFound(qexception.NotFound):
message = _("IKEPolicy %(ikepolicy_id)s could not be found") message = _("IKEPolicy %(ikepolicy_id)s could not be found")

View File

@ -962,15 +962,19 @@ class TestVpnaas(VPNPluginDbTestCase):
dpd_interval=100, dpd_interval=100,
dpd_timeout=100, expected_status_int=400) dpd_timeout=100, expected_status_int=400)
def test_create_ipsec_site_connection(self, **extras): def _test_create_ipsec_site_connection(self, key_overrides=None,
"""Test case to create an ipsec_site_connection.""" setup_overrides=None,
ikename = "ikepolicy1" expected_status_int=200):
ipsecname = "ipsecpolicy1" """Create ipsec_site_connection and check results."""
vpnsname = "vpnservice1" params = {'ikename': 'ikepolicy1',
name = "connection1" 'ipsecname': 'ipsecpolicy1',
description = "my-ipsec-connection" 'vpnsname': 'vpnservice1',
keys = {'name': name, 'subnet_cidr': '10.2.0.0/24',
'description': "my-ipsec-connection", 'subnet_version': 4}
if setup_overrides is not None:
params.update(setup_overrides)
keys = {'name': 'connection1',
'description': 'my-ipsec-connection',
'peer_address': '192.168.1.10', 'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10', 'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'], 'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
@ -980,17 +984,19 @@ class TestVpnaas(VPNPluginDbTestCase):
'psk': 'abcd', 'psk': 'abcd',
'status': 'PENDING_CREATE', 'status': 'PENDING_CREATE',
'admin_state_up': True} 'admin_state_up': True}
if key_overrides is not None:
keys.update(key_overrides)
dpd = {'action': 'hold', dpd = {'action': 'hold',
'interval': 40, 'interval': 40,
'timeout': 120} 'timeout': 120}
keys.update(extras)
with contextlib.nested( with contextlib.nested(
self.ikepolicy(name=ikename), self.ikepolicy(name=params['ikename']),
self.ipsecpolicy(name=ipsecname), self.ipsecpolicy(name=params['ipsecname']),
self.subnet(), self.subnet(cidr=params['subnet_cidr'],
ip_version=params['subnet_version']),
self.router()) as ( self.router()) as (
ikepolicy, ipsecpolicy, subnet, router): ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=vpnsname, subnet=subnet, with self.vpnservice(name=params['vpnsname'], subnet=subnet,
router=router) as vpnservice1: router=router) as vpnservice1:
keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id'] keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id']
keys['ipsecpolicy_id'] = ( keys['ipsecpolicy_id'] = (
@ -999,9 +1005,10 @@ class TestVpnaas(VPNPluginDbTestCase):
keys['vpnservice_id'] = ( keys['vpnservice_id'] = (
vpnservice1['vpnservice']['id'] vpnservice1['vpnservice']['id']
) )
try:
with self.ipsec_site_connection( with self.ipsec_site_connection(
self.fmt, self.fmt,
name, keys['name'],
keys['peer_address'], keys['peer_address'],
keys['peer_id'], keys['peer_id'],
keys['peer_cidrs'], keys['peer_cidrs'],
@ -1015,23 +1022,48 @@ class TestVpnaas(VPNPluginDbTestCase):
ikepolicy, ikepolicy,
ipsecpolicy, ipsecpolicy,
keys['admin_state_up'], keys['admin_state_up'],
description=description, description=keys['description']
**extras
) as ipsec_site_connection: ) as ipsec_site_connection:
if expected_status_int != 200:
self.fail("Expected failure on create")
self._check_ipsec_site_connection( self._check_ipsec_site_connection(
ipsec_site_connection['ipsec_site_connection'], ipsec_site_connection['ipsec_site_connection'],
keys, keys,
dpd) dpd)
except webob.exc.HTTPClientError as ce:
self.assertEqual(ce.code, expected_status_int)
def test_create_ipsec_site_connection(self, **extras):
"""Test case to create an ipsec_site_connection."""
self._test_create_ipsec_site_connection(key_overrides=extras)
def test_create_ipsec_site_connection_invalid_mtu(self):
"""Test creating an ipsec_site_connection with invalid MTU."""
self._test_create_ipsec_site_connection(key_overrides={'mtu': 67},
expected_status_int=400)
ipv6_overrides = {
'peer_address': 'fe80::c0a8:10a',
'peer_id': 'fe80::c0a8:10a',
'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
'mtu': 1279}
ipv6_setup_params = {'subnet_cidr': 'fe80::a01:0/120',
'subnet_version': 6}
self._test_create_ipsec_site_connection(
key_overrides=ipv6_overrides,
setup_overrides=ipv6_setup_params,
expected_status_int=400)
def _check_ipsec_site_connection(self, ipsec_site_connection, keys, dpd): def _check_ipsec_site_connection(self, ipsec_site_connection, keys, dpd):
self.assertEqual( self.assertEqual(
keys,
dict((k, v) for k, v dict((k, v) for k, v
in ipsec_site_connection.items() in ipsec_site_connection.items()
if k in keys), keys) if k in keys))
self.assertEqual( self.assertEqual(
dpd,
dict((k, v) for k, v dict((k, v) for k, v
in ipsec_site_connection['dpd'].items() in ipsec_site_connection['dpd'].items()
if k in dpd), dpd) if k in dpd))
def test_delete_ipsec_site_connection(self): def test_delete_ipsec_site_connection(self):
"""Test case to delete a ipsec_site_connection.""" """Test case to delete a ipsec_site_connection."""
@ -1045,14 +1077,23 @@ class TestVpnaas(VPNPluginDbTestCase):
self.assertEqual(res.status_int, 204) self.assertEqual(res.status_int, 204)
def test_update_ipsec_site_connection(self): def test_update_ipsec_site_connection(self):
"""Test case for valid updates to IPSec site connection."""
dpd = {'action': 'hold', dpd = {'action': 'hold',
'interval': 40, 'interval': 40,
'timeout': 120} 'timeout': 120}
self._test_update_ipsec_site_connection( self._test_update_ipsec_site_connection(update={'dpd': dpd})
update={'dpd': dpd} self._test_update_ipsec_site_connection(update={'mtu': 2000})
) ipv6_settings = {
'peer_address': 'fe80::c0a8:10a',
'peer_id': 'fe80::c0a8:10a',
'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
'subnet_cidr': 'fe80::a02:0/120',
'subnet_version': 6}
self._test_update_ipsec_site_connection(update={'mtu': 2000},
overrides=ipv6_settings)
def test_update_ipsec_site_connection_with_invalid_dpd(self): def test_update_ipsec_site_connection_with_invalid_dpd(self):
"""Test updates to ipsec_site_connection with invalid DPD settings."""
dpd1 = {'action': 'hold', dpd1 = {'action': 'hold',
'interval': 100, 'interval': 100,
'timeout': 100} 'timeout': 100}
@ -1072,159 +1113,44 @@ class TestVpnaas(VPNPluginDbTestCase):
update={'dpd': dpd3}, update={'dpd': dpd3},
expected_status_int=400) expected_status_int=400)
def _test_update_ipsec_site_connection( def test_update_ipsec_site_connection_with_invalid_mtu(self):
self, update=None, expected_status_int=200): """Test updates to ipsec_site_connection with invalid MTU settings."""
"""Test case to update a ipsec_site_connection.""" self._test_update_ipsec_site_connection(
name = 'new_ipsec_site_connection' update={'mtu': 67}, expected_status_int=400)
ikename = 'ikepolicy1' ipv6_settings = {
ipsecname = 'ipsecpolicy1' 'peer_address': 'fe80::c0a8:10a',
vpnsname = 'vpnservice1' 'peer_id': 'fe80::c0a8:10a',
description = 'my-ipsec-connection' 'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
keys = {'name': 'new_ipsec_site_connection', 'subnet_cidr': 'fe80::a02:0/120',
'description': "my-ipsec-connection", 'subnet_version': 6}
'peer_address': '192.168.1.10', self._test_update_ipsec_site_connection(
'peer_id': '192.168.1.10', update={'mtu': 1279},
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'], overrides=ipv6_settings,
'initiator': 'bi-directional', expected_status_int=400)
'mtu': 1500,
'tenant_id': self._tenant_id,
'psk': 'abcd',
'status': 'ACTIVE',
'admin_state_up': True}
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
with contextlib.nested(
self.ikepolicy(name=ikename),
self.ipsecpolicy(name=ipsecname),
self.subnet(cidr='10.2.0.0/24'),
self.router()) as (
ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=vpnsname, subnet=subnet,
router=router) as vpnservice1:
keys['vpnservice_id'] = (
vpnservice1['vpnservice']['id']
)
keys['ikepolicy_id'] = (
ikepolicy['ikepolicy']['id']
)
keys['ipsecpolicy_id'] = (
ipsecpolicy['ipsecpolicy']['id']
)
with self.ipsec_site_connection(
self.fmt,
name,
keys['peer_address'],
keys['peer_id'],
keys['peer_cidrs'],
keys['mtu'],
keys['psk'],
keys['initiator'],
dpd['action'],
dpd['interval'],
dpd['timeout'],
vpnservice1,
ikepolicy,
ipsecpolicy,
keys['admin_state_up'],
description=description
) as ipsec_site_connection:
if not update:
update = {'name': name}
data = {'ipsec_site_connection': update}
self._set_active(
vpn_db.IPsecSiteConnection,
ipsec_site_connection['ipsec_site_connection']['id'])
req = self.new_update_request(
'ipsec-site-connections',
data,
ipsec_site_connection['ipsec_site_connection']['id']
)
res = req.get_response(self.ext_api)
self.assertEqual(expected_status_int, res.status_int)
if expected_status_int == 200:
res_dict = self.deserialize(
self.fmt,
res
)
for k, v in update.items():
self.assertEqual(
res_dict['ipsec_site_connection'][k], v)
def test_update_ipsec_site_connection_with_invalid_state(self): def test_update_ipsec_site_connection_with_invalid_state(self):
"""Test case to update an ipsec_site_connection in invalid state.""" """Test updating an ipsec_site_connection in invalid state."""
name = 'new_ipsec_site_connection' self._test_update_ipsec_site_connection(
ikename = 'ikepolicy1' overrides={'make_active': False},
ipsecname = 'ipsecpolicy1' expected_status_int=400)
vpnsname = 'vpnservice1'
description = 'my-ipsec-connection'
keys = {'name': 'new_ipsec_site_connection',
'description': "my-ipsec-connection",
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
'initiator': 'bi-directional',
'mtu': 1500,
'tenant_id': self._tenant_id,
'psk': 'abcd',
'status': 'ACTIVE',
'admin_state_up': True}
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
with contextlib.nested(
self.ikepolicy(name=ikename),
self.ipsecpolicy(name=ipsecname),
self.subnet(cidr='10.2.0.0/24'),
self.router()) as (
ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=vpnsname, subnet=subnet,
router=router) as vpnservice1:
keys['vpnservice_id'] = (
vpnservice1['vpnservice']['id']
)
keys['ikepolicy_id'] = (
ikepolicy['ikepolicy']['id']
)
keys['ipsecpolicy_id'] = (
ipsecpolicy['ipsecpolicy']['id']
)
with self.ipsec_site_connection(
self.fmt,
name,
keys['peer_address'],
keys['peer_id'],
keys['peer_cidrs'],
keys['mtu'],
keys['psk'],
keys['initiator'],
dpd['action'],
dpd['interval'],
dpd['timeout'],
vpnservice1,
ikepolicy,
ipsecpolicy,
keys['admin_state_up'],
description=description
) as ipsec_site_connection:
data = {'ipsec_site_connection': {'name': name}}
req = self.new_update_request(
'ipsec-site-connections',
data,
ipsec_site_connection['ipsec_site_connection']['id']
)
res = req.get_response(self.ext_api)
self.assertEqual(400, res.status_int)
def test_update_ipsec_site_connection_peer_cidrs(self): def test_update_ipsec_site_connection_peer_cidrs(self):
"""Test case to update a ipsec_site_connection for peer_cidrs.""" """Test updating an ipsec_site_connection for peer_cidrs."""
name = 'ipsec_site_connection' new_peers = {'peer_cidrs': ['192.168.4.0/24',
ikename = 'ikepolicy1' '192.168.5.0/24']}
ipsecname = 'ipsecpolicy1' self._test_update_ipsec_site_connection(
vpnsname = 'vpnservice1' update=new_peers)
description = 'my-ipsec-connection'
keys = {'name': 'ipsec_site_connection', def _test_update_ipsec_site_connection(self,
'description': "my-ipsec-connection", update={'name': 'new name'},
overrides=None,
expected_status_int=200):
"""Creates and then updates ipsec_site_connection."""
keys = {'name': 'new_ipsec_site_connection',
'ikename': 'ikepolicy1',
'ipsecname': 'ipsecpolicy1',
'vpnsname': 'vpnservice1',
'description': 'my-ipsec-connection',
'peer_address': '192.168.1.10', 'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10', 'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'], 'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
@ -1233,60 +1159,63 @@ class TestVpnaas(VPNPluginDbTestCase):
'tenant_id': self._tenant_id, 'tenant_id': self._tenant_id,
'psk': 'abcd', 'psk': 'abcd',
'status': 'ACTIVE', 'status': 'ACTIVE',
'admin_state_up': True} 'admin_state_up': True,
dpd = {'action': 'hold', 'action': 'hold',
'interval': 40, 'interval': 40,
'timeout': 120} 'timeout': 120,
'subnet_cidr': '10.2.0.0/24',
'subnet_version': 4,
'make_active': True}
if overrides is not None:
keys.update(overrides)
with contextlib.nested( with contextlib.nested(
self.ikepolicy(name=ikename), self.ikepolicy(name=keys['ikename']),
self.ipsecpolicy(name=ipsecname), self.ipsecpolicy(name=keys['ipsecname']),
self.subnet(cidr='10.2.0.0/24'), self.subnet(cidr=keys['subnet_cidr'],
ip_version=keys['subnet_version']),
self.router()) as ( self.router()) as (
ikepolicy, ipsecpolicy, subnet, router): ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=vpnsname, subnet=subnet, with self.vpnservice(name=keys['vpnsname'], subnet=subnet,
router=router) as vpnservice1: router=router) as vpnservice1:
keys['vpnservice_id'] = vpnservice1['vpnservice']['id'] keys['vpnservice_id'] = vpnservice1['vpnservice']['id']
keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id'] keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id']
keys['ipsecpolicy_id'] = ipsecpolicy['ipsecpolicy']['id'] keys['ipsecpolicy_id'] = ipsecpolicy['ipsecpolicy']['id']
with self.ipsec_site_connection( with self.ipsec_site_connection(
self.fmt, self.fmt,
name, keys['name'],
keys['peer_address'], keys['peer_address'],
keys['peer_id'], keys['peer_id'],
keys['peer_cidrs'], keys['peer_cidrs'],
keys['mtu'], keys['mtu'],
keys['psk'], keys['psk'],
keys['initiator'], keys['initiator'],
dpd['action'], keys['action'],
dpd['interval'], keys['interval'],
dpd['timeout'], keys['timeout'],
vpnservice1, vpnservice1,
ikepolicy, ikepolicy,
ipsecpolicy, ipsecpolicy,
keys['admin_state_up'], keys['admin_state_up'],
description=description description=keys['description']
) as ipsec_site_connection: ) as ipsec_site_connection:
data = {'ipsec_site_connection': { data = {'ipsec_site_connection': update}
'peer_cidrs': ['192.168.2.0/24', if keys.get('make_active', None):
'192.168.3.0/24']
}}
self._set_active( self._set_active(
vpn_db.IPsecSiteConnection, vpn_db.IPsecSiteConnection,
ipsec_site_connection['ipsec_site_connection']['id']) (ipsec_site_connection['ipsec_site_connection']
['id']))
req = self.new_update_request( req = self.new_update_request(
'ipsec-site-connections', 'ipsec-site-connections',
data, data,
ipsec_site_connection[ ipsec_site_connection['ipsec_site_connection']['id'])
'ipsec_site_connection']['id'] res = req.get_response(self.ext_api)
) self.assertEqual(expected_status_int, res.status_int)
res = self.deserialize( if expected_status_int == 200:
self.fmt, res_dict = self.deserialize(self.fmt, res)
req.get_response(self.ext_api) for k, v in update.items():
) self.assertEqual(
self._check_ipsec_site_connection( res_dict['ipsec_site_connection'][k], v)
res['ipsec_site_connection'],
keys,
dpd)
def test_show_ipsec_site_connection(self): def test_show_ipsec_site_connection(self):
"""Test case to show a ipsec_site_connection.""" """Test case to show a ipsec_site_connection."""