vmware-nsx/neutron/tests/unit/db/vpn/test_db_vpnaas.py
Yong Sheng Gong 331de555f2 validate if the router has external gateway interface set
If the router wants to work with vpn service, we must
make sure external gateway interface is set.

This patch cannot prevent user from clearing the gateway interface
of the router after the vpnservice is created.

Change-Id: If0f00def949b31c1e3da7a2cd055454567201e4c
Closes-Bug: #1258379
2013-12-10 05:23:56 +08:00

1656 lines
75 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# (c) Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Swaminathan Vasudevan, Hewlett-Packard.
import contextlib
import os
import webob.exc
from neutron.api.extensions import ExtensionMiddleware
from neutron.api.extensions import PluginAwareExtensionManager
from neutron.common import config
from neutron import context
from neutron.db import agentschedulers_db
from neutron.db import l3_agentschedulers_db
from neutron.db.vpn import vpn_db
from neutron import extensions
from neutron.extensions import vpnaas
from neutron import manager
from neutron.plugins.common import constants
from neutron.scheduler import l3_agent_scheduler
from neutron.services.vpn import plugin as vpn_plugin
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin
DB_CORE_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
DB_VPN_PLUGIN_KLASS = "neutron.services.vpn.plugin.VPNPlugin"
ROOTDIR = os.path.normpath(os.path.join(
os.path.dirname(__file__),
'..', '..', '..', '..'))
extensions_path = ':'.join(extensions.__path__)
class TestVpnCorePlugin(test_l3_plugin.TestL3NatIntPlugin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin):
def __init__(self, configfile=None):
super(TestVpnCorePlugin, self).__init__()
self.router_scheduler = l3_agent_scheduler.ChanceScheduler()
class VPNPluginDbTestCase(test_l3_plugin.L3NatTestCaseMixin,
test_db_plugin.NeutronDbPluginV2TestCase):
resource_prefix_map = dict(
(k.replace('_', '-'),
constants.COMMON_PREFIXES[constants.VPN])
for k in vpnaas.RESOURCE_ATTRIBUTE_MAP
)
def setUp(self, core_plugin=None, vpnaas_plugin=DB_VPN_PLUGIN_KLASS):
service_plugins = {'vpnaas_plugin': vpnaas_plugin}
plugin_str = ('neutron.tests.unit.db.vpn.'
'test_db_vpnaas.TestVpnCorePlugin')
super(VPNPluginDbTestCase, self).setUp(
plugin_str,
service_plugins=service_plugins
)
self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
self.core_plugin = TestVpnCorePlugin
self.plugin = vpn_plugin.VPNPlugin()
ext_mgr = PluginAwareExtensionManager(
extensions_path,
{constants.CORE: self.core_plugin,
constants.VPN: self.plugin}
)
app = config.load_paste_app('extensions_test_app')
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
def _create_ikepolicy(self, fmt,
name='ikepolicy1',
auth_algorithm='sha1',
encryption_algorithm='aes-128',
phase1_negotiation_mode='main',
lifetime_units='seconds',
lifetime_value=3600,
ike_version='v1',
pfs='group5',
expected_res_status=None, **kwargs):
data = {'ikepolicy': {
'name': name,
'auth_algorithm': auth_algorithm,
'encryption_algorithm': encryption_algorithm,
'phase1_negotiation_mode': phase1_negotiation_mode,
'lifetime': {
'units': lifetime_units,
'value': lifetime_value},
'ike_version': ike_version,
'pfs': pfs,
'tenant_id': self._tenant_id
}}
for arg in ['description']:
if arg in kwargs and kwargs[arg] is not None:
data['ikepolicy'][arg] = kwargs[arg]
ikepolicy_req = self.new_create_request('ikepolicies', data, fmt)
ikepolicy_res = ikepolicy_req.get_response(self.ext_api)
if expected_res_status:
self.assertEqual(ikepolicy_res.status_int, expected_res_status)
return ikepolicy_res
@contextlib.contextmanager
def ikepolicy(self, fmt=None,
name='ikepolicy1',
auth_algorithm='sha1',
encryption_algorithm='aes-128',
phase1_negotiation_mode='main',
lifetime_units='seconds',
lifetime_value=3600,
ike_version='v1',
pfs='group5',
no_delete=False,
**kwargs):
if not fmt:
fmt = self.fmt
res = self._create_ikepolicy(fmt,
name,
auth_algorithm,
encryption_algorithm,
phase1_negotiation_mode,
lifetime_units,
lifetime_value,
ike_version,
pfs,
**kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
try:
ikepolicy = self.deserialize(fmt or self.fmt, res)
yield ikepolicy
finally:
if not no_delete:
self._delete('ikepolicies', ikepolicy['ikepolicy']['id'])
def _create_ipsecpolicy(self, fmt,
name='ipsecpolicy1',
auth_algorithm='sha1',
encryption_algorithm='aes-128',
encapsulation_mode='tunnel',
transform_protocol='esp',
lifetime_units='seconds',
lifetime_value=3600,
pfs='group5',
expected_res_status=None,
**kwargs):
data = {'ipsecpolicy': {'name': name,
'auth_algorithm': auth_algorithm,
'encryption_algorithm': encryption_algorithm,
'encapsulation_mode': encapsulation_mode,
'transform_protocol': transform_protocol,
'lifetime': {'units': lifetime_units,
'value': lifetime_value},
'pfs': pfs,
'tenant_id': self._tenant_id}}
for arg in ['description']:
if arg in kwargs and kwargs[arg] is not None:
data['ipsecpolicy'][arg] = kwargs[arg]
ipsecpolicy_req = self.new_create_request('ipsecpolicies', data, fmt)
ipsecpolicy_res = ipsecpolicy_req.get_response(self.ext_api)
if expected_res_status:
self.assertEqual(ipsecpolicy_res.status_int, expected_res_status)
return ipsecpolicy_res
@contextlib.contextmanager
def ipsecpolicy(self, fmt=None,
name='ipsecpolicy1',
auth_algorithm='sha1',
encryption_algorithm='aes-128',
encapsulation_mode='tunnel',
transform_protocol='esp',
lifetime_units='seconds',
lifetime_value=3600,
pfs='group5',
no_delete=False, **kwargs):
if not fmt:
fmt = self.fmt
res = self._create_ipsecpolicy(fmt,
name,
auth_algorithm,
encryption_algorithm,
encapsulation_mode,
transform_protocol,
lifetime_units,
lifetime_value,
pfs,
**kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
try:
ipsecpolicy = self.deserialize(fmt or self.fmt, res)
yield ipsecpolicy
finally:
if not no_delete:
self._delete('ipsecpolicies', ipsecpolicy['ipsecpolicy']['id'])
def _create_vpnservice(self, fmt, name,
admin_state_up,
router_id, subnet_id,
expected_res_status=None, **kwargs):
tenant_id = kwargs.get('tenant_id', self._tenant_id)
data = {'vpnservice': {'name': name,
'subnet_id': subnet_id,
'router_id': router_id,
'admin_state_up': admin_state_up,
'tenant_id': tenant_id}}
for arg in ['description']:
if arg in kwargs and kwargs[arg] is not None:
data['vpnservice'][arg] = kwargs[arg]
vpnservice_req = self.new_create_request('vpnservices', data, fmt)
if (kwargs.get('set_context') and
'tenant_id' in kwargs):
# create a specific auth context for this request
vpnservice_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
vpnservice_res = vpnservice_req.get_response(self.ext_api)
if expected_res_status:
self.assertEqual(vpnservice_res.status_int, expected_res_status)
return vpnservice_res
@contextlib.contextmanager
def vpnservice(self, fmt=None, name='vpnservice1',
subnet=None,
router=None,
admin_state_up=True,
no_delete=False,
plug_subnet=True,
external_subnet_cidr='192.168.100.0/24',
external_router=True,
**kwargs):
if not fmt:
fmt = self.fmt
with contextlib.nested(
test_db_plugin.optional_ctx(subnet, self.subnet),
test_db_plugin.optional_ctx(router, self.router),
self.subnet(cidr=external_subnet_cidr)) as (tmp_subnet,
tmp_router,
public_sub):
if external_router:
self._set_net_external(
public_sub['subnet']['network_id'])
self._add_external_gateway_to_router(
tmp_router['router']['id'],
public_sub['subnet']['network_id'])
tmp_router['router']['external_gateway_info'] = {
'network_id': public_sub['subnet']['network_id']}
if plug_subnet:
self._router_interface_action(
'add',
tmp_router['router']['id'],
tmp_subnet['subnet']['id'], None)
try:
res = self._create_vpnservice(fmt,
name,
admin_state_up,
router_id=(tmp_router['router']
['id']),
subnet_id=(tmp_subnet['subnet']
['id']),
**kwargs)
vpnservice = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(
code=res.status_int, detail=vpnservice)
yield vpnservice
finally:
if not no_delete and vpnservice.get('vpnservice'):
self._delete('vpnservices',
vpnservice['vpnservice']['id'])
if plug_subnet:
self._router_interface_action(
'remove',
tmp_router['router']['id'],
tmp_subnet['subnet']['id'], None)
if external_router:
external_gateway = tmp_router['router'].get(
'external_gateway_info')
if external_gateway:
network_id = external_gateway['network_id']
self._remove_external_gateway_from_router(
tmp_router['router']['id'], network_id)
def _create_ipsec_site_connection(self, fmt, name='test',
peer_address='192.168.1.10',
peer_id='192.168.1.10',
peer_cidrs=None,
mtu=1500,
psk='abcdefg',
initiator='bi-directional',
dpd_action='hold',
dpd_interval=30,
dpd_timeout=120,
vpnservice_id='fake_id',
ikepolicy_id='fake_id',
ipsecpolicy_id='fake_id',
admin_state_up=True,
expected_res_status=None, **kwargs):
data = {
'ipsec_site_connection': {'name': name,
'peer_address': peer_address,
'peer_id': peer_id,
'peer_cidrs': peer_cidrs,
'mtu': mtu,
'psk': psk,
'initiator': initiator,
'dpd': {
'action': dpd_action,
'interval': dpd_interval,
'timeout': dpd_timeout,
},
'vpnservice_id': vpnservice_id,
'ikepolicy_id': ikepolicy_id,
'ipsecpolicy_id': ipsecpolicy_id,
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}
}
for arg in ['description']:
if arg in kwargs and kwargs[arg] is not None:
data['ipsec_site_connection'][arg] = kwargs[arg]
ipsec_site_connection_req = self.new_create_request(
'ipsec-site-connections', data, fmt
)
ipsec_site_connection_res = ipsec_site_connection_req.get_response(
self.ext_api
)
if expected_res_status:
self.assertEqual(
ipsec_site_connection_res.status_int, expected_res_status
)
return ipsec_site_connection_res
@contextlib.contextmanager
def ipsec_site_connection(self, fmt=None, name='ipsec_site_connection1',
peer_address='192.168.1.10',
peer_id='192.168.1.10',
peer_cidrs=None,
mtu=1500,
psk='abcdefg',
initiator='bi-directional',
dpd_action='hold',
dpd_interval=30,
dpd_timeout=120,
vpnservice=None,
ikepolicy=None,
ipsecpolicy=None,
admin_state_up=True, no_delete=False,
**kwargs):
if not fmt:
fmt = self.fmt
with contextlib.nested(
test_db_plugin.optional_ctx(vpnservice,
self.vpnservice),
test_db_plugin.optional_ctx(ikepolicy,
self.ikepolicy),
test_db_plugin.optional_ctx(ipsecpolicy,
self.ipsecpolicy)
) as (tmp_vpnservice, tmp_ikepolicy, tmp_ipsecpolicy):
vpnservice_id = tmp_vpnservice['vpnservice']['id']
ikepolicy_id = tmp_ikepolicy['ikepolicy']['id']
ipsecpolicy_id = tmp_ipsecpolicy['ipsecpolicy']['id']
res = self._create_ipsec_site_connection(fmt,
name,
peer_address,
peer_id,
peer_cidrs,
mtu,
psk,
initiator,
dpd_action,
dpd_interval,
dpd_timeout,
vpnservice_id,
ikepolicy_id,
ipsecpolicy_id,
admin_state_up,
**kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
try:
ipsec_site_connection = self.deserialize(
fmt or self.fmt, res
)
yield ipsec_site_connection
finally:
if not no_delete:
self._delete(
'ipsec-site-connections',
ipsec_site_connection[
'ipsec_site_connection']['id']
)
class TestVpnaas(VPNPluginDbTestCase):
def _check_policy(self, policy, keys, lifetime):
for k, v in keys:
self.assertEqual(policy[k], v)
for k, v in lifetime.iteritems():
self.assertEqual(policy['lifetime'][k], v)
def test_create_ikepolicy(self):
"""Test case to create an ikepolicy."""
name = "ikepolicy1"
description = 'ipsec-ikepolicy'
keys = [('name', name),
('description', 'ipsec-ikepolicy'),
('auth_algorithm', 'sha1'),
('encryption_algorithm', 'aes-128'),
('phase1_negotiation_mode', 'main'),
('ike_version', 'v1'),
('pfs', 'group5'),
('tenant_id', self._tenant_id)]
lifetime = {
'units': 'seconds',
'value': 3600}
with self.ikepolicy(name=name, description=description) as ikepolicy:
self._check_policy(ikepolicy['ikepolicy'], keys, lifetime)
def test_delete_ikepolicy(self):
"""Test case to delete an ikepolicy."""
with self.ikepolicy(no_delete=True) as ikepolicy:
req = self.new_delete_request('ikepolicies',
ikepolicy['ikepolicy']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
def test_show_ikepolicy(self):
"""Test case to show or get an ikepolicy."""
name = "ikepolicy1"
description = 'ipsec-ikepolicy'
keys = [('name', name),
('auth_algorithm', 'sha1'),
('encryption_algorithm', 'aes-128'),
('phase1_negotiation_mode', 'main'),
('ike_version', 'v1'),
('pfs', 'group5'),
('tenant_id', self._tenant_id)]
lifetime = {
'units': 'seconds',
'value': 3600}
with self.ikepolicy(name=name, description=description) as ikepolicy:
req = self.new_show_request('ikepolicies',
ikepolicy['ikepolicy']['id'],
fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self._check_policy(res['ikepolicy'], keys, lifetime)
def test_list_ikepolicies(self):
"""Test case to list all ikepolicies."""
name = "ikepolicy_list"
keys = [('name', name),
('auth_algorithm', 'sha1'),
('encryption_algorithm', 'aes-128'),
('phase1_negotiation_mode', 'main'),
('ike_version', 'v1'),
('pfs', 'group5'),
('tenant_id', self._tenant_id)]
lifetime = {
'units': 'seconds',
'value': 3600}
with self.ikepolicy(name=name) as ikepolicy:
keys.append(('id', ikepolicy['ikepolicy']['id']))
req = self.new_list_request('ikepolicies')
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(len(res), 1)
for k, v in keys:
self.assertEqual(res['ikepolicies'][0][k], v)
for k, v in lifetime.iteritems():
self.assertEqual(res['ikepolicies'][0]['lifetime'][k], v)
def test_list_ikepolicies_with_sort_emulated(self):
"""Test case to list all ikepolicies."""
with contextlib.nested(self.ikepolicy(name='ikepolicy1'),
self.ikepolicy(name='ikepolicy2'),
self.ikepolicy(name='ikepolicy3')
) as (ikepolicy1, ikepolicy2, ikepolicy3):
self._test_list_with_sort('ikepolicy', (ikepolicy3,
ikepolicy2,
ikepolicy1),
[('name', 'desc')],
'ikepolicies')
def test_list_ikepolicies_with_pagination_emulated(self):
"""Test case to list all ikepolicies with pagination."""
with contextlib.nested(self.ikepolicy(name='ikepolicy1'),
self.ikepolicy(name='ikepolicy2'),
self.ikepolicy(name='ikepolicy3')
) as (ikepolicy1, ikepolicy2, ikepolicy3):
self._test_list_with_pagination('ikepolicy',
(ikepolicy1,
ikepolicy2,
ikepolicy3),
('name', 'asc'), 2, 2,
'ikepolicies')
def test_list_ikepolicies_with_pagination_reverse_emulated(self):
"""Test case to list all ikepolicies with reverse pagination."""
with contextlib.nested(self.ikepolicy(name='ikepolicy1'),
self.ikepolicy(name='ikepolicy2'),
self.ikepolicy(name='ikepolicy3')
) as (ikepolicy1, ikepolicy2, ikepolicy3):
self._test_list_with_pagination_reverse('ikepolicy',
(ikepolicy1,
ikepolicy2,
ikepolicy3),
('name', 'asc'), 2, 2,
'ikepolicies')
def test_update_ikepolicy(self):
"""Test case to update an ikepolicy."""
name = "new_ikepolicy1"
keys = [('name', name),
('auth_algorithm', 'sha1'),
('encryption_algorithm', 'aes-128'),
('phase1_negotiation_mode', 'main'),
('ike_version', 'v1'),
('pfs', 'group5'),
('tenant_id', self._tenant_id),
('lifetime', {'units': 'seconds',
'value': 60})]
with self.ikepolicy(name=name) as ikepolicy:
data = {'ikepolicy': {'name': name,
'lifetime': {'units': 'seconds',
'value': 60}}}
req = self.new_update_request("ikepolicies",
data,
ikepolicy['ikepolicy']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['ikepolicy'][k], v)
def test_create_ikepolicy_with_invalid_values(self):
"""Test case to test invalid values."""
name = 'ikepolicy1'
self._create_ikepolicy(name=name,
fmt=self.fmt,
auth_algorithm='md5',
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
auth_algorithm=200,
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
encryption_algorithm='des',
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
encryption_algorithm=100,
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
phase1_negotiation_mode='aggressive',
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
phase1_negotiation_mode=-100,
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
ike_version='v6',
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
ike_version=500,
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
pfs='group1',
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
pfs=120,
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
lifetime_units='Megabytes',
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
lifetime_units=20000,
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
lifetime_value=-20,
expected_res_status=400)
self._create_ikepolicy(name=name,
fmt=self.fmt,
lifetime_value='Megabytes',
expected_res_status=400)
def test_create_ipsecpolicy(self):
"""Test case to create an ipsecpolicy."""
name = "ipsecpolicy1"
description = 'my-ipsecpolicy'
keys = [('name', name),
('description', 'my-ipsecpolicy'),
('auth_algorithm', 'sha1'),
('encryption_algorithm', 'aes-128'),
('encapsulation_mode', 'tunnel'),
('transform_protocol', 'esp'),
('pfs', 'group5'),
('tenant_id', self._tenant_id)]
lifetime = {
'units': 'seconds',
'value': 3600}
with self.ipsecpolicy(name=name,
description=description) as ipsecpolicy:
self._check_policy(ipsecpolicy['ipsecpolicy'], keys, lifetime)
def test_delete_ipsecpolicy(self):
"""Test case to delete an ipsecpolicy."""
with self.ipsecpolicy(no_delete=True) as ipsecpolicy:
req = self.new_delete_request('ipsecpolicies',
ipsecpolicy['ipsecpolicy']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
def test_show_ipsecpolicy(self):
"""Test case to show or get an ipsecpolicy."""
name = "ipsecpolicy1"
keys = [('name', name),
('auth_algorithm', 'sha1'),
('encryption_algorithm', 'aes-128'),
('encapsulation_mode', 'tunnel'),
('transform_protocol', 'esp'),
('pfs', 'group5'),
('tenant_id', self._tenant_id)]
lifetime = {
'units': 'seconds',
'value': 3600}
with self.ipsecpolicy(name=name) as ipsecpolicy:
req = self.new_show_request('ipsecpolicies',
ipsecpolicy['ipsecpolicy']['id'],
fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self._check_policy(res['ipsecpolicy'], keys, lifetime)
def test_list_ipsecpolicies(self):
"""Test case to list all ipsecpolicies."""
name = "ipsecpolicy_list"
keys = [('name', name),
('auth_algorithm', 'sha1'),
('encryption_algorithm', 'aes-128'),
('encapsulation_mode', 'tunnel'),
('transform_protocol', 'esp'),
('pfs', 'group5'),
('tenant_id', self._tenant_id)]
lifetime = {
'units': 'seconds',
'value': 3600}
with self.ipsecpolicy(name=name) as ipsecpolicy:
keys.append(('id', ipsecpolicy['ipsecpolicy']['id']))
req = self.new_list_request('ipsecpolicies')
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(len(res), 1)
self._check_policy(res['ipsecpolicies'][0], keys, lifetime)
def test_list_ipsecpolicies_with_sort_emulated(self):
"""Test case to list all ipsecpolicies."""
with contextlib.nested(self.ipsecpolicy(name='ipsecpolicy1'),
self.ipsecpolicy(name='ipsecpolicy2'),
self.ipsecpolicy(name='ipsecpolicy3')
) as(ipsecpolicy1, ipsecpolicy2, ipsecpolicy3):
self._test_list_with_sort('ipsecpolicy', (ipsecpolicy3,
ipsecpolicy2,
ipsecpolicy1),
[('name', 'desc')],
'ipsecpolicies')
def test_list_ipsecpolicies_with_pagination_emulated(self):
"""Test case to list all ipsecpolicies with pagination."""
with contextlib.nested(self.ipsecpolicy(name='ipsecpolicy1'),
self.ipsecpolicy(name='ipsecpolicy2'),
self.ipsecpolicy(name='ipsecpolicy3')
) as(ipsecpolicy1, ipsecpolicy2, ipsecpolicy3):
self._test_list_with_pagination('ipsecpolicy',
(ipsecpolicy1,
ipsecpolicy2,
ipsecpolicy3),
('name', 'asc'), 2, 2,
'ipsecpolicies')
def test_list_ipsecpolicies_with_pagination_reverse_emulated(self):
"""Test case to list all ipsecpolicies with reverse pagination."""
with contextlib.nested(self.ipsecpolicy(name='ipsecpolicy1'),
self.ipsecpolicy(name='ipsecpolicy2'),
self.ipsecpolicy(name='ipsecpolicy3')
) as(ipsecpolicy1, ipsecpolicy2, ipsecpolicy3):
self._test_list_with_pagination_reverse('ipsecpolicy',
(ipsecpolicy1,
ipsecpolicy2,
ipsecpolicy3),
('name', 'asc'), 2, 2,
'ipsecpolicies')
def test_update_ipsecpolicy(self):
"""Test case to update an ipsecpolicy."""
name = "new_ipsecpolicy1"
keys = [('name', name),
('auth_algorithm', 'sha1'),
('encryption_algorithm', 'aes-128'),
('encapsulation_mode', 'tunnel'),
('transform_protocol', 'esp'),
('pfs', 'group5'),
('tenant_id', self._tenant_id),
('lifetime', {'units': 'seconds',
'value': 60})]
with self.ipsecpolicy(name=name) as ipsecpolicy:
data = {'ipsecpolicy': {'name': name,
'lifetime': {'units': 'seconds',
'value': 60}}}
req = self.new_update_request("ipsecpolicies",
data,
ipsecpolicy['ipsecpolicy']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['ipsecpolicy'][k], v)
def test_update_ipsecpolicy_lifetime(self):
with self.ipsecpolicy() as ipsecpolicy:
data = {'ipsecpolicy': {'lifetime': {'units': 'seconds'}}}
req = self.new_update_request("ipsecpolicies",
data,
ipsecpolicy['ipsecpolicy']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(res['ipsecpolicy']['lifetime']['units'],
'seconds')
data = {'ipsecpolicy': {'lifetime': {'value': 60}}}
req = self.new_update_request("ipsecpolicies",
data,
ipsecpolicy['ipsecpolicy']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(res['ipsecpolicy']['lifetime']['value'], 60)
def test_create_ipsecpolicy_with_invalid_values(self):
"""Test case to test invalid values."""
name = 'ipsecpolicy1'
self._create_ipsecpolicy(
fmt=self.fmt,
name=name, auth_algorithm='md5', expected_res_status=400)
self._create_ipsecpolicy(
fmt=self.fmt,
name=name, auth_algorithm=100, expected_res_status=400)
self._create_ipsecpolicy(
fmt=self.fmt,
name=name, encryption_algorithm='des', expected_res_status=400)
self._create_ipsecpolicy(
fmt=self.fmt,
name=name, encryption_algorithm=200, expected_res_status=400)
self._create_ipsecpolicy(
fmt=self.fmt,
name=name, transform_protocol='abcd', expected_res_status=400)
self._create_ipsecpolicy(
fmt=self.fmt,
name=name, transform_protocol=500, expected_res_status=400)
self._create_ipsecpolicy(
fmt=self.fmt,
name=name,
encapsulation_mode='unsupported', expected_res_status=400)
self._create_ipsecpolicy(name=name,
fmt=self.fmt,
encapsulation_mode=100,
expected_res_status=400)
self._create_ipsecpolicy(name=name,
fmt=self.fmt,
pfs='group9', expected_res_status=400)
self._create_ipsecpolicy(
fmt=self.fmt, name=name, pfs=-1, expected_res_status=400)
self._create_ipsecpolicy(
fmt=self.fmt, name=name, lifetime_units='minutes',
expected_res_status=400)
self._create_ipsecpolicy(fmt=self.fmt, name=name, lifetime_units=100,
expected_res_status=400)
self._create_ipsecpolicy(fmt=self.fmt, name=name,
lifetime_value=-800, expected_res_status=400)
self._create_ipsecpolicy(fmt=self.fmt, name=name,
lifetime_value='Megabytes',
expected_res_status=400)
def test_create_vpnservice(self, **extras):
"""Test case to create a vpnservice."""
description = 'my-vpn-service'
expected = {'name': 'vpnservice1',
'description': 'my-vpn-service',
'admin_state_up': True,
'status': 'PENDING_CREATE',
'tenant_id': self._tenant_id, }
expected.update(extras)
with self.subnet(cidr='10.2.0.0/24') as subnet:
with self.router() as router:
expected['router_id'] = router['router']['id']
expected['subnet_id'] = subnet['subnet']['id']
name = expected['name']
with self.vpnservice(name=name,
subnet=subnet,
router=router,
description=description,
**extras) as vpnservice:
self.assertEqual(dict((k, v) for k, v in
vpnservice['vpnservice'].items()
if k in expected),
expected)
def test_create_vpnservice_with_invalid_router(self):
"""Test case to create a vpnservice with other tenant's router"""
with self.network(
set_context=True,
tenant_id='tenant_a') as network:
with self.subnet(network=network,
cidr='10.2.0.0/24') as subnet:
with self.router(
set_context=True, tenant_id='tenant_a') as router:
router_id = router['router']['id']
subnet_id = subnet['subnet']['id']
self._create_vpnservice(
self.fmt, 'fake',
True, router_id, subnet_id,
expected_res_status=webob.exc.HTTPNotFound.code,
set_context=True, tenant_id='tenant_b')
def test_create_vpnservice_with_router_no_external_gateway(self):
"""Test case to create a vpnservice with inner router"""
error_code = 0
with self.subnet(cidr='10.2.0.0/24') as subnet:
with self.router() as router:
router_id = router['router']['id']
try:
with self.vpnservice(subnet=subnet,
router=router,
external_router=False):
pass
except webob.exc.HTTPClientError as e:
error_code, error_detail = (
e.status_code, e.detail['NeutronError']['message'])
self.assertEqual(400, error_code)
msg = str(vpnaas.RouterIsNotExternal(router_id=router_id))
self.assertEqual(msg, error_detail)
def test_create_vpnservice_with_nonconnected_subnet(self):
"""Test case to create a vpnservice with nonconnected subnet."""
with self.network() as network:
with self.subnet(network=network,
cidr='10.2.0.0/24') as subnet:
with self.router() as router:
router_id = router['router']['id']
subnet_id = subnet['subnet']['id']
self._create_vpnservice(
self.fmt, 'fake',
True, router_id, subnet_id,
expected_res_status=webob.exc.HTTPBadRequest.code)
def test_delete_router_in_use_by_vpnservice(self):
"""Test delete router in use by vpn service."""
with self.subnet(cidr='10.2.0.0/24') as subnet:
with self.router() as router:
with self.vpnservice(subnet=subnet,
router=router):
self._delete('routers', router['router']['id'],
expected_code=webob.exc.HTTPConflict.code)
def _set_active(self, model, resource_id):
service_plugin = manager.NeutronManager.get_service_plugins()[
constants.VPN]
adminContext = context.get_admin_context()
with adminContext.session.begin(subtransactions=True):
resource_db = service_plugin._get_resource(
adminContext,
model,
resource_id)
resource_db.status = constants.ACTIVE
def test_update_vpnservice(self):
"""Test case to update a vpnservice."""
name = 'new_vpnservice1'
keys = [('name', name)]
with contextlib.nested(
self.subnet(cidr='10.2.0.0/24'),
self.router()) as (subnet, router):
with self.vpnservice(name=name,
subnet=subnet,
router=router) as vpnservice:
keys.append(('subnet_id',
vpnservice['vpnservice']['subnet_id']))
keys.append(('router_id',
vpnservice['vpnservice']['router_id']))
data = {'vpnservice': {'name': name}}
self._set_active(vpn_db.VPNService,
vpnservice['vpnservice']['id'])
req = self.new_update_request(
'vpnservices',
data,
vpnservice['vpnservice']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['vpnservice'][k], v)
def test_update_vpnservice_with_invalid_state(self):
"""Test case to update a vpnservice in invalid state ."""
name = 'new_vpnservice1'
keys = [('name', name)]
with contextlib.nested(
self.subnet(cidr='10.2.0.0/24'),
self.router()) as (subnet, router):
with self.vpnservice(name=name,
subnet=subnet,
router=router) as vpnservice:
keys.append(('subnet_id',
vpnservice['vpnservice']['subnet_id']))
keys.append(('router_id',
vpnservice['vpnservice']['router_id']))
data = {'vpnservice': {'name': name}}
req = self.new_update_request(
'vpnservices',
data,
vpnservice['vpnservice']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(400, res.status_int)
res = self.deserialize(self.fmt, res)
self.assertIn(vpnservice['vpnservice']['id'],
res['NeutronError']['message'])
def test_delete_vpnservice(self):
"""Test case to delete a vpnservice."""
with self.vpnservice(name='vpnserver',
no_delete=True) as vpnservice:
req = self.new_delete_request('vpnservices',
vpnservice['vpnservice']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
def test_show_vpnservice(self):
"""Test case to show or get a vpnservice."""
name = "vpnservice1"
keys = [('name', name),
('description', ''),
('admin_state_up', True),
('status', 'PENDING_CREATE')]
with self.vpnservice(name=name) as vpnservice:
req = self.new_show_request('vpnservices',
vpnservice['vpnservice']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['vpnservice'][k], v)
def test_list_vpnservices(self):
"""Test case to list all vpnservices."""
name = "vpnservice_list"
keys = [('name', name),
('description', ''),
('admin_state_up', True),
('status', 'PENDING_CREATE')]
with self.vpnservice(name=name) as vpnservice:
keys.append(('subnet_id', vpnservice['vpnservice']['subnet_id']))
keys.append(('router_id', vpnservice['vpnservice']['router_id']))
req = self.new_list_request('vpnservices')
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(len(res), 1)
for k, v in keys:
self.assertEqual(res['vpnservices'][0][k], v)
def test_list_vpnservices_with_sort_emulated(self):
"""Test case to list all vpnservices with sorting."""
with self.subnet() as subnet:
with self.router() as router:
with contextlib.nested(
self.vpnservice(name='vpnservice1',
subnet=subnet,
router=router,
external_subnet_cidr='192.168.10.0/24',),
self.vpnservice(name='vpnservice2',
subnet=subnet,
router=router,
plug_subnet=False,
external_router=False,
external_subnet_cidr='192.168.11.0/24',),
self.vpnservice(name='vpnservice3',
subnet=subnet,
router=router,
plug_subnet=False,
external_router=False,
external_subnet_cidr='192.168.13.0/24',)
) as(vpnservice1, vpnservice2, vpnservice3):
self._test_list_with_sort('vpnservice', (vpnservice3,
vpnservice2,
vpnservice1),
[('name', 'desc')])
def test_list_vpnservice_with_pagination_emulated(self):
"""Test case to list all vpnservices with pagination."""
with self.subnet() as subnet:
with self.router() as router:
with contextlib.nested(
self.vpnservice(name='vpnservice1',
subnet=subnet,
router=router,
external_subnet_cidr='192.168.10.0/24'),
self.vpnservice(name='vpnservice2',
subnet=subnet,
router=router,
plug_subnet=False,
external_subnet_cidr='192.168.20.0/24',
external_router=False),
self.vpnservice(name='vpnservice3',
subnet=subnet,
router=router,
plug_subnet=False,
external_subnet_cidr='192.168.30.0/24',
external_router=False)
) as(vpnservice1, vpnservice2, vpnservice3):
self._test_list_with_pagination('vpnservice',
(vpnservice1,
vpnservice2,
vpnservice3),
('name', 'asc'), 2, 2)
def test_list_vpnservice_with_pagination_reverse_emulated(self):
"""Test case to list all vpnservices with reverse pagination."""
with self.subnet() as subnet:
with self.router() as router:
with contextlib.nested(
self.vpnservice(name='vpnservice1',
subnet=subnet,
router=router,
external_subnet_cidr='192.168.10.0/24'),
self.vpnservice(name='vpnservice2',
subnet=subnet,
router=router,
plug_subnet=False,
external_subnet_cidr='192.168.11.0/24',
external_router=False),
self.vpnservice(name='vpnservice3',
subnet=subnet,
router=router,
plug_subnet=False,
external_subnet_cidr='192.168.12.0/24',
external_router=False)
) as(vpnservice1, vpnservice2, vpnservice3):
self._test_list_with_pagination_reverse('vpnservice',
(vpnservice1,
vpnservice2,
vpnservice3),
('name', 'asc'),
2, 2)
def test_create_ipsec_site_connection_with_invalid_values(self):
"""Test case to create an ipsec_site_connection with invalid values."""
name = 'connection1'
self._create_ipsec_site_connection(
fmt=self.fmt,
name=name, peer_cidrs='myname', expected_status_int=400)
self._create_ipsec_site_connection(
fmt=self.fmt,
name=name, mtu=-100, expected_status_int=400)
self._create_ipsec_site_connection(
fmt=self.fmt,
name=name, dpd_action='unsupported', expected_status_int=400)
self._create_ipsec_site_connection(
fmt=self.fmt,
name=name, dpd_interval=-1, expected_status_int=400)
self._create_ipsec_site_connection(
fmt=self.fmt,
name=name, dpd_timeout=-200, expected_status_int=400)
self._create_ipsec_site_connection(
fmt=self.fmt,
name=name, initiator='unsupported', expected_status_int=400)
self._create_ipsec_site_connection(
fmt=self.fmt,
name=name,
dpd_interval=30,
dpd_timeout=20, expected_status_int=400)
self._create_ipsec_site_connection(
fmt=self.fmt,
name=name,
dpd_interval=100,
dpd_timeout=100, expected_status_int=400)
def _test_create_ipsec_site_connection(self, key_overrides=None,
setup_overrides=None,
expected_status_int=200):
"""Create ipsec_site_connection and check results."""
params = {'ikename': 'ikepolicy1',
'ipsecname': 'ipsecpolicy1',
'vpnsname': 'vpnservice1',
'subnet_cidr': '10.2.0.0/24',
'subnet_version': 4}
if setup_overrides is not None:
params.update(setup_overrides)
keys = {'name': 'connection1',
'description': 'my-ipsec-connection',
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
'initiator': 'bi-directional',
'mtu': 1500,
'tenant_id': self._tenant_id,
'psk': 'abcd',
'status': 'PENDING_CREATE',
'admin_state_up': True}
if key_overrides is not None:
keys.update(key_overrides)
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
with contextlib.nested(
self.ikepolicy(name=params['ikename']),
self.ipsecpolicy(name=params['ipsecname']),
self.subnet(cidr=params['subnet_cidr'],
ip_version=params['subnet_version']),
self.router()) as (
ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=params['vpnsname'], subnet=subnet,
router=router) as vpnservice1:
keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id']
keys['ipsecpolicy_id'] = (
ipsecpolicy['ipsecpolicy']['id']
)
keys['vpnservice_id'] = (
vpnservice1['vpnservice']['id']
)
try:
with self.ipsec_site_connection(
self.fmt,
keys['name'],
keys['peer_address'],
keys['peer_id'],
keys['peer_cidrs'],
keys['mtu'],
keys['psk'],
keys['initiator'],
dpd['action'],
dpd['interval'],
dpd['timeout'],
vpnservice1,
ikepolicy,
ipsecpolicy,
keys['admin_state_up'],
description=keys['description']
) as ipsec_site_connection:
if expected_status_int != 200:
self.fail("Expected failure on create")
self._check_ipsec_site_connection(
ipsec_site_connection['ipsec_site_connection'],
keys,
dpd)
except webob.exc.HTTPClientError as ce:
self.assertEqual(ce.code, expected_status_int)
def test_create_ipsec_site_connection(self, **extras):
"""Test case to create an ipsec_site_connection."""
self._test_create_ipsec_site_connection(key_overrides=extras)
def test_create_ipsec_site_connection_invalid_mtu(self):
"""Test creating an ipsec_site_connection with invalid MTU."""
self._test_create_ipsec_site_connection(key_overrides={'mtu': 67},
expected_status_int=400)
ipv6_overrides = {
'peer_address': 'fe80::c0a8:10a',
'peer_id': 'fe80::c0a8:10a',
'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
'mtu': 1279}
ipv6_setup_params = {'subnet_cidr': 'fe80::a01:0/120',
'subnet_version': 6}
self._test_create_ipsec_site_connection(
key_overrides=ipv6_overrides,
setup_overrides=ipv6_setup_params,
expected_status_int=400)
def _check_ipsec_site_connection(self, ipsec_site_connection, keys, dpd):
self.assertEqual(
keys,
dict((k, v) for k, v
in ipsec_site_connection.items()
if k in keys))
self.assertEqual(
dpd,
dict((k, v) for k, v
in ipsec_site_connection['dpd'].items()
if k in dpd))
def test_delete_ipsec_site_connection(self):
"""Test case to delete a ipsec_site_connection."""
with self.ipsec_site_connection(
no_delete=True) as ipsec_site_connection:
req = self.new_delete_request(
'ipsec-site-connections',
ipsec_site_connection['ipsec_site_connection']['id']
)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
def test_update_ipsec_site_connection(self):
"""Test case for valid updates to IPSec site connection."""
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
self._test_update_ipsec_site_connection(update={'dpd': dpd})
self._test_update_ipsec_site_connection(update={'mtu': 2000})
ipv6_settings = {
'peer_address': 'fe80::c0a8:10a',
'peer_id': 'fe80::c0a8:10a',
'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
'subnet_cidr': 'fe80::a02:0/120',
'subnet_version': 6}
self._test_update_ipsec_site_connection(update={'mtu': 2000},
overrides=ipv6_settings)
def test_update_ipsec_site_connection_with_invalid_dpd(self):
"""Test updates to ipsec_site_connection with invalid DPD settings."""
dpd1 = {'action': 'hold',
'interval': 100,
'timeout': 100}
self._test_update_ipsec_site_connection(
update={'dpd': dpd1},
expected_status_int=400)
dpd2 = {'action': 'hold',
'interval': 100,
'timeout': 60}
self._test_update_ipsec_site_connection(
update={'dpd': dpd2},
expected_status_int=400)
dpd3 = {'action': 'hold',
'interval': -50,
'timeout': -100}
self._test_update_ipsec_site_connection(
update={'dpd': dpd3},
expected_status_int=400)
def test_update_ipsec_site_connection_with_invalid_mtu(self):
"""Test updates to ipsec_site_connection with invalid MTU settings."""
self._test_update_ipsec_site_connection(
update={'mtu': 67}, expected_status_int=400)
ipv6_settings = {
'peer_address': 'fe80::c0a8:10a',
'peer_id': 'fe80::c0a8:10a',
'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
'subnet_cidr': 'fe80::a02:0/120',
'subnet_version': 6}
self._test_update_ipsec_site_connection(
update={'mtu': 1279},
overrides=ipv6_settings,
expected_status_int=400)
def test_update_ipsec_site_connection_with_invalid_state(self):
"""Test updating an ipsec_site_connection in invalid state."""
self._test_update_ipsec_site_connection(
overrides={'make_active': False},
expected_status_int=400)
def test_update_ipsec_site_connection_peer_cidrs(self):
"""Test updating an ipsec_site_connection for peer_cidrs."""
new_peers = {'peer_cidrs': ['192.168.4.0/24',
'192.168.5.0/24']}
self._test_update_ipsec_site_connection(
update=new_peers)
def _test_update_ipsec_site_connection(self,
update={'name': 'new name'},
overrides=None,
expected_status_int=200):
"""Creates and then updates ipsec_site_connection."""
keys = {'name': 'new_ipsec_site_connection',
'ikename': 'ikepolicy1',
'ipsecname': 'ipsecpolicy1',
'vpnsname': 'vpnservice1',
'description': 'my-ipsec-connection',
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
'initiator': 'bi-directional',
'mtu': 1500,
'tenant_id': self._tenant_id,
'psk': 'abcd',
'status': 'ACTIVE',
'admin_state_up': True,
'action': 'hold',
'interval': 40,
'timeout': 120,
'subnet_cidr': '10.2.0.0/24',
'subnet_version': 4,
'make_active': True}
if overrides is not None:
keys.update(overrides)
with contextlib.nested(
self.ikepolicy(name=keys['ikename']),
self.ipsecpolicy(name=keys['ipsecname']),
self.subnet(cidr=keys['subnet_cidr'],
ip_version=keys['subnet_version']),
self.router()) as (
ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=keys['vpnsname'], subnet=subnet,
router=router) as vpnservice1:
keys['vpnservice_id'] = vpnservice1['vpnservice']['id']
keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id']
keys['ipsecpolicy_id'] = ipsecpolicy['ipsecpolicy']['id']
with self.ipsec_site_connection(
self.fmt,
keys['name'],
keys['peer_address'],
keys['peer_id'],
keys['peer_cidrs'],
keys['mtu'],
keys['psk'],
keys['initiator'],
keys['action'],
keys['interval'],
keys['timeout'],
vpnservice1,
ikepolicy,
ipsecpolicy,
keys['admin_state_up'],
description=keys['description']
) as ipsec_site_connection:
data = {'ipsec_site_connection': update}
if keys.get('make_active', None):
self._set_active(
vpn_db.IPsecSiteConnection,
(ipsec_site_connection['ipsec_site_connection']
['id']))
req = self.new_update_request(
'ipsec-site-connections',
data,
ipsec_site_connection['ipsec_site_connection']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(expected_status_int, res.status_int)
if expected_status_int == 200:
res_dict = self.deserialize(self.fmt, res)
for k, v in update.items():
self.assertEqual(
res_dict['ipsec_site_connection'][k], v)
def test_show_ipsec_site_connection(self):
"""Test case to show a ipsec_site_connection."""
ikename = "ikepolicy1"
ipsecname = "ipsecpolicy1"
vpnsname = "vpnservice1"
name = "connection1"
description = "my-ipsec-connection"
keys = {'name': name,
'description': "my-ipsec-connection",
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
'initiator': 'bi-directional',
'mtu': 1500,
'tenant_id': self._tenant_id,
'psk': 'abcd',
'status': 'PENDING_CREATE',
'admin_state_up': True}
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
with contextlib.nested(
self.ikepolicy(name=ikename),
self.ipsecpolicy(name=ipsecname),
self.subnet(),
self.router()) as (
ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=vpnsname, subnet=subnet,
router=router) as vpnservice1:
keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id']
keys['ipsecpolicy_id'] = ipsecpolicy['ipsecpolicy']['id']
keys['vpnservice_id'] = vpnservice1['vpnservice']['id']
with self.ipsec_site_connection(
self.fmt,
name,
keys['peer_address'],
keys['peer_id'],
keys['peer_cidrs'],
keys['mtu'],
keys['psk'],
keys['initiator'],
dpd['action'],
dpd['interval'],
dpd['timeout'],
vpnservice1,
ikepolicy,
ipsecpolicy,
keys['admin_state_up'],
description=description,
) as ipsec_site_connection:
req = self.new_show_request(
'ipsec-site-connections',
ipsec_site_connection[
'ipsec_site_connection']['id'],
fmt=self.fmt
)
res = self.deserialize(
self.fmt,
req.get_response(self.ext_api)
)
self._check_ipsec_site_connection(
res['ipsec_site_connection'],
keys,
dpd)
def test_list_ipsec_site_connections_with_sort_emulated(self):
"""Test case to list all ipsec_site_connections with sort."""
with self.subnet(cidr='10.2.0.0/24') as subnet:
with self.router() as router:
with self.vpnservice(subnet=subnet,
router=router
) as vpnservice:
with contextlib.nested(
self.ipsec_site_connection(
name='connection1', vpnservice=vpnservice
),
self.ipsec_site_connection(
name='connection2', vpnservice=vpnservice
),
self.ipsec_site_connection(
name='connection3', vpnservice=vpnservice
)
) as(ipsec_site_connection1,
ipsec_site_connection2,
ipsec_site_connection3):
self._test_list_with_sort('ipsec-site-connection',
(ipsec_site_connection3,
ipsec_site_connection2,
ipsec_site_connection1),
[('name', 'desc')])
def test_list_ipsec_site_connections_with_pagination_emulated(self):
"""Test case to list all ipsec_site_connections with pagination."""
with self.subnet(cidr='10.2.0.0/24') as subnet:
with self.router() as router:
with self.vpnservice(subnet=subnet,
router=router
) as vpnservice:
with contextlib.nested(
self.ipsec_site_connection(
name='ipsec_site_connection1',
vpnservice=vpnservice
),
self.ipsec_site_connection(
name='ipsec_site_connection1',
vpnservice=vpnservice
),
self.ipsec_site_connection(
name='ipsec_site_connection1',
vpnservice=vpnservice
)
) as(ipsec_site_connection1,
ipsec_site_connection2,
ipsec_site_connection3):
self._test_list_with_pagination(
'ipsec-site-connection',
(ipsec_site_connection1,
ipsec_site_connection2,
ipsec_site_connection3),
('name', 'asc'), 2, 2)
def test_list_ipsec_site_conns_with_pagination_reverse_emulated(self):
"""Test to list all ipsec_site_connections with reverse pagination."""
with self.subnet(cidr='10.2.0.0/24') as subnet:
with self.router() as router:
with self.vpnservice(subnet=subnet,
router=router
) as vpnservice:
with contextlib.nested(
self.ipsec_site_connection(
name='connection1', vpnservice=vpnservice
),
self.ipsec_site_connection(
name='connection2', vpnservice=vpnservice
),
self.ipsec_site_connection(
name='connection3', vpnservice=vpnservice
)
) as(ipsec_site_connection1,
ipsec_site_connection2,
ipsec_site_connection3):
self._test_list_with_pagination_reverse(
'ipsec-site-connection',
(ipsec_site_connection1,
ipsec_site_connection2,
ipsec_site_connection3),
('name', 'asc'), 2, 2
)
def test_create_vpn(self):
"""Test case to create a vpn."""
vpns_name = "vpnservice1"
ike_name = "ikepolicy1"
ipsec_name = "ipsecpolicy1"
name1 = "ipsec_site_connection1"
with contextlib.nested(
self.ikepolicy(name=ike_name),
self.ipsecpolicy(name=ipsec_name),
self.vpnservice(name=vpns_name)) as (
ikepolicy, ipsecpolicy, vpnservice):
vpnservice_id = vpnservice['vpnservice']['id']
ikepolicy_id = ikepolicy['ikepolicy']['id']
ipsecpolicy_id = ipsecpolicy['ipsecpolicy']['id']
with self.ipsec_site_connection(
self.fmt,
name1,
'192.168.1.10',
'192.168.1.10',
['192.168.2.0/24',
'192.168.3.0/24'],
1500,
'abcdef',
'bi-directional',
'hold',
30,
120,
vpnservice,
ikepolicy,
ipsecpolicy,
True
) as vpnconn1:
vpnservice_req = self.new_show_request(
'vpnservices',
vpnservice_id,
fmt=self.fmt)
vpnservice_updated = self.deserialize(
self.fmt,
vpnservice_req.get_response(self.ext_api)
)
self.assertEqual(
vpnservice_updated['vpnservice']['id'],
vpnconn1['ipsec_site_connection']['vpnservice_id']
)
ikepolicy_req = self.new_show_request('ikepolicies',
ikepolicy_id,
fmt=self.fmt)
ikepolicy_res = self.deserialize(
self.fmt,
ikepolicy_req.get_response(self.ext_api)
)
self.assertEqual(
ikepolicy_res['ikepolicy']['id'],
vpnconn1['ipsec_site_connection']['ikepolicy_id'])
ipsecpolicy_req = self.new_show_request(
'ipsecpolicies',
ipsecpolicy_id,
fmt=self.fmt)
ipsecpolicy_res = self.deserialize(
self.fmt,
ipsecpolicy_req.get_response(self.ext_api)
)
self.assertEqual(
ipsecpolicy_res['ipsecpolicy']['id'],
vpnconn1['ipsec_site_connection']['ipsecpolicy_id']
)
def test_delete_ikepolicy_inuse(self):
"""Test case to delete an ikepolicy, that is in use."""
vpns_name = "vpnservice1"
ike_name = "ikepolicy1"
ipsec_name = "ipsecpolicy1"
name1 = "ipsec_site_connection1"
with self.ikepolicy(name=ike_name) as ikepolicy:
with self.ipsecpolicy(name=ipsec_name) as ipsecpolicy:
with self.vpnservice(name=vpns_name) as vpnservice:
with self.ipsec_site_connection(
self.fmt,
name1,
'192.168.1.10',
'192.168.1.10',
['192.168.2.0/24',
'192.168.3.0/24'],
1500,
'abcdef',
'bi-directional',
'hold',
30,
120,
vpnservice,
ikepolicy,
ipsecpolicy,
True
):
delete_req = self.new_delete_request(
'ikepolicies',
ikepolicy['ikepolicy']['id']
)
delete_res = delete_req.get_response(self.ext_api)
self.assertEqual(409, delete_res.status_int)
def test_delete_ipsecpolicy_inuse(self):
"""Test case to delete an ipsecpolicy, that is in use."""
vpns_name = "vpnservice1"
ike_name = "ikepolicy1"
ipsec_name = "ipsecpolicy1"
name1 = "ipsec_site_connection1"
with self.ikepolicy(name=ike_name) as ikepolicy:
with self.ipsecpolicy(name=ipsec_name) as ipsecpolicy:
with self.vpnservice(name=vpns_name) as vpnservice:
with self.ipsec_site_connection(
self.fmt,
name1,
'192.168.1.10',
'192.168.1.10',
['192.168.2.0/24',
'192.168.3.0/24'],
1500,
'abcdef',
'bi-directional',
'hold',
30,
120,
vpnservice,
ikepolicy,
ipsecpolicy,
True
):
delete_req = self.new_delete_request(
'ipsecpolicies',
ipsecpolicy['ipsecpolicy']['id']
)
delete_res = delete_req.get_response(self.ext_api)
self.assertEqual(409, delete_res.status_int)
class TestVpnaasXML(TestVpnaas):
fmt = 'xml'