a4e35f0ad3
Use policy MDproxy objects. The initialization supports both MP & Policy MD proxy, and uses the one the was configured, so there is no need for migration or upgrade process. In addition, remove the dependency between the AZs and _has_native_dhcp_metadata which existed for the v3 plugin long time ago. Change-Id: I7df49d6a347715cf1ec5453f921e67bd71b1f225
2301 lines
107 KiB
Python
2301 lines
107 KiB
Python
# Copyright (c) 2018 OpenStack Foundation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import mock
|
|
|
|
from oslo_config import cfg
|
|
from oslo_utils import uuidutils
|
|
from webob import exc
|
|
|
|
from neutron.extensions import address_scope
|
|
from neutron.extensions import l3
|
|
from neutron.extensions import securitygroup as secgrp
|
|
from neutron.tests.unit.db import test_db_base_plugin_v2
|
|
from neutron.tests.unit.extensions import test_address_scope
|
|
from neutron.tests.unit.extensions import test_extraroute as test_ext_route
|
|
from neutron.tests.unit.extensions import test_l3 as test_l3_plugin
|
|
from neutron.tests.unit.extensions import test_securitygroup
|
|
|
|
from neutron_lib.api.definitions import external_net as extnet_apidef
|
|
from neutron_lib.api.definitions import extraroute as xroute_apidef
|
|
from neutron_lib.api.definitions import l3_ext_gw_mode as l3_egm_apidef
|
|
from neutron_lib.api.definitions import port_security as psec
|
|
from neutron_lib.api.definitions import portbindings
|
|
from neutron_lib.api.definitions import provider_net as pnet
|
|
from neutron_lib.api.definitions import vlantransparent as vlan_apidef
|
|
from neutron_lib.callbacks import events
|
|
from neutron_lib.callbacks import exceptions as nc_exc
|
|
from neutron_lib.callbacks import registry
|
|
from neutron_lib.callbacks import resources
|
|
from neutron_lib import constants
|
|
from neutron_lib import context
|
|
from neutron_lib import exceptions as n_exc
|
|
from neutron_lib.objects import registry as obj_reg
|
|
from neutron_lib.plugins import directory
|
|
|
|
from vmware_nsx.common import utils
|
|
from vmware_nsx.extensions import providersecuritygroup as provider_sg
|
|
from vmware_nsx.plugins.common import plugin as com_plugin
|
|
from vmware_nsx.plugins.nsx_p import plugin as nsx_plugin
|
|
from vmware_nsx.services.lbaas.nsx_p.implementation import loadbalancer_mgr
|
|
from vmware_nsx.services.lbaas.octavia import octavia_listener
|
|
|
|
from vmware_nsx.tests import unit as vmware
|
|
from vmware_nsx.tests.unit.common_plugin import common_v3
|
|
from vmware_nsxlib.v3 import exceptions as nsxlib_exc
|
|
from vmware_nsxlib.v3 import nsx_constants
|
|
from vmware_nsxlib.v3.policy import constants as policy_constants
|
|
from vmware_nsxlib.v3 import utils as nsxlib_utils
|
|
|
|
|
|
PLUGIN_NAME = 'vmware_nsx.plugin.NsxPolicyPlugin'
|
|
NSX_OVERLAY_TZ_NAME = 'OVERLAY_TZ'
|
|
NSX_VLAN_TZ_NAME = 'VLAN_TZ'
|
|
DEFAULT_TIER0_ROUTER_UUID = "efad0078-9204-4b46-a2d8-d4dd31ed448f"
|
|
NSX_DHCP_PROFILE_ID = 'DHCP_PROFILE'
|
|
NSX_MD_PROXY_ID = 'MD_PROXY'
|
|
LOGICAL_SWITCH_ID = '00000000-1111-2222-3333-444444444444'
|
|
|
|
|
|
def _return_id_key(*args, **kwargs):
|
|
return {'id': uuidutils.generate_uuid()}
|
|
|
|
|
|
def _return_id_key_list(*args, **kwargs):
|
|
return [{'id': uuidutils.generate_uuid()}]
|
|
|
|
|
|
def _return_same(key, *args, **kwargs):
|
|
return key
|
|
|
|
|
|
class NsxPPluginTestCaseMixin(
|
|
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
|
|
|
|
def setUp(self, plugin=PLUGIN_NAME,
|
|
ext_mgr=None,
|
|
service_plugins=None, **kwargs):
|
|
|
|
self._mock_nsx_policy_backend_calls()
|
|
self._mock_nsxlib_backend_calls()
|
|
self.setup_conf_overrides()
|
|
super(NsxPPluginTestCaseMixin, self).setUp(plugin=plugin,
|
|
ext_mgr=ext_mgr)
|
|
self.ctx = context.get_admin_context()
|
|
|
|
def _mock_nsx_policy_backend_calls(self):
|
|
resource_list_result = {'results': [{'id': 'test',
|
|
'display_name': 'test'}]}
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.policy.NsxPolicyLib.get_version",
|
|
return_value=nsx_constants.NSX_VERSION_3_0_0).start()
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.client.RESTClient.get").start()
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.client.RESTClient.list",
|
|
return_value=resource_list_result).start()
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.client.RESTClient.patch").start()
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.client.RESTClient.update").start()
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.client.RESTClient.delete").start()
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyCommunicationMapApi._get_last_seq_num",
|
|
return_value=-1).start()
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyResourceBase._wait_until_realized",
|
|
return_value={'state': policy_constants.STATE_REALIZED}
|
|
).start()
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyTier1Api.update_transport_zone").start()
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicySegmentApi.get_realized_logical_switch_id",
|
|
return_value=LOGICAL_SWITCH_ID
|
|
).start()
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicySegmentApi.get_realized_id",
|
|
return_value=LOGICAL_SWITCH_ID
|
|
).start()
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicySegmentApi.set_admin_state").start()
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicySegmentPortApi.set_admin_state").start()
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources.NsxPolicyTier0Api."
|
|
"get_edge_cluster_path", return_value="x/1").start()
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyEdgeClusterApi.get_edge_node_ids",
|
|
return_value=["node1"]).start()
|
|
mock.patch("vmware_nsxlib.v3.NsxLib.get_tag_limits",
|
|
return_value=nsxlib_utils.TagLimits(20, 40, 15)).start()
|
|
# Add some nsxlib mocks for the passthrough apis
|
|
mock.patch("vmware_nsxlib.v3.NsxLib.get_version",
|
|
return_value=nsx_constants.NSX_VERSION_3_0_0).start()
|
|
mock.patch("vmware_nsxlib.v3.core_resources.NsxLibLogicalRouter."
|
|
"update").start()
|
|
mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportNode."
|
|
"get_transport_zones",
|
|
return_value=[NSX_OVERLAY_TZ_NAME,
|
|
NSX_VLAN_TZ_NAME, mock.ANY]).start()
|
|
mock.patch("vmware_nsxlib.v3.core_resources.NsxLibEdgeCluster."
|
|
"get_transport_nodes", return_value=["dummy"]).start()
|
|
|
|
def _mock_nsxlib_backend_calls(self):
|
|
"""Mock nsxlib backend calls used as passthrough
|
|
until implemented by policy
|
|
"""
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.core_resources.NsxLibDhcpProfile."
|
|
"get_id_by_name_or_id",
|
|
return_value=NSX_DHCP_PROFILE_ID).start()
|
|
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.resources.LogicalDhcpServer."
|
|
"get_id_by_name_or_id",
|
|
return_value=_return_same).start()
|
|
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.core_resources.NsxLibMetadataProxy."
|
|
"get_id_by_name_or_id",
|
|
side_effect=_return_same).start()
|
|
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.resources.LogicalPort.create",
|
|
side_effect=_return_id_key).start()
|
|
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.resources.LogicalDhcpServer.create",
|
|
side_effect=_return_id_key).start()
|
|
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.resources.LogicalDhcpServer.update",
|
|
side_effect=_return_id_key).start()
|
|
|
|
mock.patch(
|
|
"vmware_nsxlib.v3.resources.LogicalDhcpServer.create_binding",
|
|
side_effect=_return_id_key).start()
|
|
|
|
mock.patch("vmware_nsxlib.v3.resources.LogicalDhcpServer."
|
|
"update_binding").start()
|
|
|
|
mock.patch("vmware_nsxlib.v3.NsxLib."
|
|
"get_id_by_resource_and_tag").start()
|
|
|
|
def setup_conf_overrides(self):
|
|
cfg.CONF.set_override('default_overlay_tz', NSX_OVERLAY_TZ_NAME,
|
|
'nsx_p')
|
|
cfg.CONF.set_override('default_vlan_tz', NSX_VLAN_TZ_NAME, 'nsx_p')
|
|
cfg.CONF.set_override('dhcp_profile', NSX_DHCP_PROFILE_ID, 'nsx_p')
|
|
cfg.CONF.set_override('metadata_proxy', NSX_MD_PROXY_ID, 'nsx_p')
|
|
cfg.CONF.set_override('dhcp_agent_notification', False)
|
|
|
|
def _create_network(self, fmt, name, admin_state_up,
|
|
arg_list=None, providernet_args=None,
|
|
set_context=False, tenant_id=None,
|
|
**kwargs):
|
|
tenant_id = tenant_id or self._tenant_id
|
|
data = {'network': {'name': name,
|
|
'admin_state_up': admin_state_up,
|
|
'tenant_id': tenant_id}}
|
|
# Fix to allow the router:external attribute and any other
|
|
# attributes containing a colon to be passed with
|
|
# a double underscore instead
|
|
kwargs = dict((k.replace('__', ':'), v) for k, v in kwargs.items())
|
|
if extnet_apidef.EXTERNAL in kwargs:
|
|
arg_list = (extnet_apidef.EXTERNAL, ) + (arg_list or ())
|
|
|
|
if providernet_args:
|
|
kwargs.update(providernet_args)
|
|
for arg in (('admin_state_up', 'tenant_id', 'shared',
|
|
'availability_zone_hints') + (arg_list or ())):
|
|
# Arg must be present
|
|
if arg in kwargs:
|
|
data['network'][arg] = kwargs[arg]
|
|
network_req = self.new_create_request('networks', data, fmt)
|
|
if set_context and tenant_id:
|
|
# create a specific auth context for this request
|
|
network_req.environ['neutron.context'] = context.Context(
|
|
'', tenant_id)
|
|
return network_req.get_response(self.api)
|
|
|
|
def _create_l3_ext_network(self, physical_network='abc'):
|
|
name = 'l3_ext_net'
|
|
net_type = utils.NetworkTypes.L3_EXT
|
|
providernet_args = {pnet.NETWORK_TYPE: net_type,
|
|
pnet.PHYSICAL_NETWORK: physical_network}
|
|
return self.network(name=name,
|
|
router__external=True,
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE,
|
|
pnet.PHYSICAL_NETWORK))
|
|
|
|
def _initialize_azs(self):
|
|
self.plugin.init_availability_zones()
|
|
self.plugin._init_default_config()
|
|
|
|
|
|
class NsxPTestNetworks(test_db_base_plugin_v2.TestNetworksV2,
|
|
NsxPPluginTestCaseMixin):
|
|
|
|
def setUp(self, plugin=PLUGIN_NAME,
|
|
ext_mgr=None,
|
|
service_plugins=None):
|
|
# add vlan transparent to the configuration
|
|
cfg.CONF.set_override('vlan_transparent', True)
|
|
super(NsxPTestNetworks, self).setUp(plugin=plugin,
|
|
ext_mgr=ext_mgr)
|
|
|
|
def tearDown(self):
|
|
super(NsxPTestNetworks, self).tearDown()
|
|
|
|
def test_create_provider_flat_network(self):
|
|
providernet_args = {pnet.NETWORK_TYPE: 'flat'}
|
|
with mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicySegmentApi.create_or_overwrite',
|
|
side_effect=_return_id_key) as nsx_create, \
|
|
mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicySegmentApi.delete') as nsx_delete, \
|
|
mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicyTransportZoneApi.get_transport_type',
|
|
return_value=nsx_constants.TRANSPORT_TYPE_VLAN), \
|
|
self.network(name='flat_net',
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE, )) as net:
|
|
self.assertEqual('flat', net['network'].get(pnet.NETWORK_TYPE))
|
|
# make sure the network is created at the backend
|
|
nsx_create.assert_called_once()
|
|
|
|
# Delete the network and make sure it is deleted from the backend
|
|
req = self.new_delete_request('networks', net['network']['id'])
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
|
|
nsx_delete.assert_called_once()
|
|
|
|
def test_create_provider_flat_network_with_physical_net(self):
|
|
physical_network = DEFAULT_TIER0_ROUTER_UUID
|
|
providernet_args = {pnet.NETWORK_TYPE: 'flat',
|
|
pnet.PHYSICAL_NETWORK: physical_network}
|
|
with mock.patch(
|
|
'vmware_nsxlib.v3.policy.core_resources.NsxPolicyTransportZoneApi.'
|
|
'get_transport_type',
|
|
return_value=nsx_constants.TRANSPORT_TYPE_VLAN), \
|
|
self.network(name='flat_net',
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE,
|
|
pnet.PHYSICAL_NETWORK)) as net:
|
|
self.assertEqual('flat', net['network'].get(pnet.NETWORK_TYPE))
|
|
|
|
def test_create_provider_flat_network_with_vlan(self):
|
|
providernet_args = {pnet.NETWORK_TYPE: 'flat',
|
|
pnet.SEGMENTATION_ID: 11}
|
|
with mock.patch(
|
|
'vmware_nsxlib.v3.policy.core_resources.NsxPolicyTransportZoneApi.'
|
|
'get_transport_type',
|
|
return_value=nsx_constants.TRANSPORT_TYPE_VLAN):
|
|
result = self._create_network(fmt='json', name='bad_flat_net',
|
|
admin_state_up=True,
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE,
|
|
pnet.SEGMENTATION_ID))
|
|
data = self.deserialize('json', result)
|
|
# should fail
|
|
self.assertEqual('InvalidInput', data['NeutronError']['type'])
|
|
|
|
def test_create_provider_geneve_network(self):
|
|
providernet_args = {pnet.NETWORK_TYPE: 'geneve'}
|
|
with mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicySegmentApi.create_or_overwrite',
|
|
side_effect=_return_id_key) as nsx_create, \
|
|
mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicySegmentApi.delete') as nsx_delete, \
|
|
self.network(name='geneve_net',
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE, )) as net:
|
|
self.assertEqual('geneve', net['network'].get(pnet.NETWORK_TYPE))
|
|
# make sure the network is created at the backend
|
|
nsx_create.assert_called_once()
|
|
|
|
# Delete the network and make sure it is deleted from the backend
|
|
req = self.new_delete_request('networks', net['network']['id'])
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
|
|
nsx_delete.assert_called_once()
|
|
|
|
def test_create_provider_geneve_network_with_physical_net(self):
|
|
physical_network = DEFAULT_TIER0_ROUTER_UUID
|
|
providernet_args = {pnet.NETWORK_TYPE: 'geneve',
|
|
pnet.PHYSICAL_NETWORK: physical_network}
|
|
with mock.patch(
|
|
'vmware_nsxlib.v3.policy.core_resources.NsxPolicyTransportZoneApi.'
|
|
'get_transport_type',
|
|
return_value=nsx_constants.TRANSPORT_TYPE_OVERLAY),\
|
|
self.network(name='geneve_net',
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE, )) as net:
|
|
self.assertEqual('geneve', net['network'].get(pnet.NETWORK_TYPE))
|
|
|
|
def test_create_provider_geneve_network_with_vlan(self):
|
|
providernet_args = {pnet.NETWORK_TYPE: 'geneve',
|
|
pnet.SEGMENTATION_ID: 11}
|
|
with mock.patch(
|
|
'vmware_nsxlib.v3.policy.core_resources.NsxPolicyTransportZoneApi.'
|
|
'get_transport_type',
|
|
return_value=nsx_constants.TRANSPORT_TYPE_OVERLAY):
|
|
result = self._create_network(fmt='json', name='bad_geneve_net',
|
|
admin_state_up=True,
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE,
|
|
pnet.SEGMENTATION_ID))
|
|
data = self.deserialize('json', result)
|
|
# should fail
|
|
self.assertEqual('InvalidInput', data['NeutronError']['type'])
|
|
|
|
def test_create_provider_vlan_network(self):
|
|
providernet_args = {pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.SEGMENTATION_ID: 11}
|
|
with mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicySegmentApi.create_or_overwrite',
|
|
side_effect=_return_id_key) as nsx_create, \
|
|
mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicySegmentApi.delete') as nsx_delete, \
|
|
mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicyTransportZoneApi.get_transport_type',
|
|
return_value=nsx_constants.TRANSPORT_TYPE_VLAN), \
|
|
self.network(name='vlan_net',
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE,
|
|
pnet.SEGMENTATION_ID)) as net:
|
|
self.assertEqual('vlan', net['network'].get(pnet.NETWORK_TYPE))
|
|
# make sure the network is created at the backend
|
|
nsx_create.assert_called_once()
|
|
|
|
# Delete the network and make sure it is deleted from the backend
|
|
req = self.new_delete_request('networks', net['network']['id'])
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
|
|
nsx_delete.assert_called_once()
|
|
|
|
def test_create_provider_nsx_network(self):
|
|
physical_network = 'Fake logical switch'
|
|
providernet_args = {pnet.NETWORK_TYPE: 'nsx-net',
|
|
pnet.PHYSICAL_NETWORK: physical_network}
|
|
|
|
with mock.patch(
|
|
'vmware_nsxlib.v3.policy.core_resources.NsxPolicySegmentApi.'
|
|
'create_or_overwrite',
|
|
side_effect=nsxlib_exc.ResourceNotFound) as nsx_create, \
|
|
mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicySegmentApi.delete') as nsx_delete, \
|
|
self.network(name='nsx_net',
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE,
|
|
pnet.PHYSICAL_NETWORK)) as net:
|
|
self.assertEqual('nsx-net', net['network'].get(pnet.NETWORK_TYPE))
|
|
self.assertEqual(physical_network,
|
|
net['network'].get(pnet.PHYSICAL_NETWORK))
|
|
# make sure the network is NOT created at the backend
|
|
nsx_create.assert_not_called()
|
|
|
|
# Delete the network. It should NOT deleted from the backend
|
|
req = self.new_delete_request('networks', net['network']['id'])
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
|
|
nsx_delete.assert_not_called()
|
|
|
|
def test_create_provider_bad_nsx_network(self):
|
|
physical_network = 'Bad logical switch'
|
|
providernet_args = {pnet.NETWORK_TYPE: 'nsx-net',
|
|
pnet.PHYSICAL_NETWORK: physical_network}
|
|
with mock.patch(
|
|
"vmware_nsxlib.v3.policy.core_resources.NsxPolicySegmentApi.get",
|
|
side_effect=nsxlib_exc.ResourceNotFound):
|
|
result = self._create_network(fmt='json', name='bad_nsx_net',
|
|
admin_state_up=True,
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE,
|
|
pnet.PHYSICAL_NETWORK))
|
|
data = self.deserialize('json', result)
|
|
# should fail
|
|
self.assertEqual('InvalidInput', data['NeutronError']['type'])
|
|
|
|
def _test_transparent_vlan_net(self, net_type, tz_type, should_succeed):
|
|
providernet_args = {pnet.NETWORK_TYPE: net_type,
|
|
vlan_apidef.VLANTRANSPARENT: True}
|
|
with mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicyTransportZoneApi.get_transport_type',
|
|
return_value=tz_type):
|
|
result = self._create_network(fmt='json', name='vlan_net',
|
|
admin_state_up=True,
|
|
providernet_args=providernet_args,
|
|
arg_list=(
|
|
pnet.NETWORK_TYPE,
|
|
vlan_apidef.VLANTRANSPARENT))
|
|
data = self.deserialize('json', result)
|
|
if should_succeed:
|
|
self.assertEqual(net_type,
|
|
data['network'].get(pnet.NETWORK_TYPE))
|
|
self.assertTrue(
|
|
data['network'].get(vlan_apidef.VLANTRANSPARENT))
|
|
else:
|
|
self.assertEqual('InvalidInput', data['NeutronError']['type'])
|
|
|
|
def test_create_non_provider_network_with_transparent(self):
|
|
self._test_transparent_vlan_net(
|
|
net_type="",
|
|
tz_type=nsx_constants.TRANSPORT_TYPE_OVERLAY,
|
|
should_succeed=False)
|
|
|
|
def test_create_provider_overlay_network_with_transparent(self):
|
|
self._test_transparent_vlan_net(
|
|
net_type=utils.NsxV3NetworkTypes.GENEVE,
|
|
tz_type=nsx_constants.TRANSPORT_TYPE_OVERLAY,
|
|
should_succeed=True)
|
|
|
|
def test_create_provider_flat_network_with_transparent(self):
|
|
self._test_transparent_vlan_net(
|
|
net_type=utils.NsxV3NetworkTypes.FLAT,
|
|
tz_type=nsx_constants.TRANSPORT_TYPE_VLAN,
|
|
should_succeed=True)
|
|
|
|
def test_create_provider_vlan_network_with_transparent(self):
|
|
self._test_transparent_vlan_net(
|
|
net_type=utils.NsxV3NetworkTypes.VLAN,
|
|
tz_type=nsx_constants.TRANSPORT_TYPE_VLAN,
|
|
should_succeed=True)
|
|
|
|
def test_network_update_external_failure(self):
|
|
data = {'network': {'name': 'net1',
|
|
'router:external': 'True',
|
|
'tenant_id': 'tenant_one',
|
|
'provider:physical_network': 'stam'}}
|
|
network_req = self.new_create_request('networks', data)
|
|
network = self.deserialize(self.fmt,
|
|
network_req.get_response(self.api))
|
|
ext_net_id = network['network']['id']
|
|
|
|
# should fail to update the network to non-external
|
|
args = {'network': {'router:external': 'False'}}
|
|
req = self.new_update_request('networks', args,
|
|
ext_net_id, fmt='json')
|
|
res = self.deserialize('json', req.get_response(self.api))
|
|
self.assertEqual('InvalidInput',
|
|
res['NeutronError']['type'])
|
|
|
|
@mock.patch.object(nsx_plugin.NsxPolicyPlugin,
|
|
'validate_availability_zones')
|
|
def test_create_network_with_availability_zone(self, mock_validate_az):
|
|
name = 'net-with-zone'
|
|
zone = ['zone1']
|
|
|
|
mock_validate_az.return_value = None
|
|
with self.network(name=name, availability_zone_hints=zone) as net:
|
|
az_hints = net['network']['availability_zone_hints']
|
|
self.assertListEqual(az_hints, zone)
|
|
|
|
def test_create_net_with_qos(self):
|
|
policy_id = uuidutils.generate_uuid()
|
|
data = {'network': {
|
|
'tenant_id': self._tenant_id,
|
|
'qos_policy_id': policy_id,
|
|
'name': 'qos_net',
|
|
'admin_state_up': True,
|
|
'shared': False}
|
|
}
|
|
dummy = mock.Mock()
|
|
dummy.id = policy_id
|
|
with mock.patch.object(self.plugin, '_validate_qos_policy_id'),\
|
|
mock.patch.object(obj_reg.load_class('QosPolicy'),
|
|
'get_network_policy',
|
|
return_value=dummy):
|
|
net = self.plugin.create_network(self.ctx, data)
|
|
self.assertEqual(policy_id, net['qos_policy_id'])
|
|
net = self.plugin.get_network(self.ctx, net['id'])
|
|
self.assertEqual(policy_id, net['qos_policy_id'])
|
|
|
|
def test_update_net_with_qos(self):
|
|
data = {'network': {
|
|
'tenant_id': self._tenant_id,
|
|
'name': 'qos_net',
|
|
'admin_state_up': True,
|
|
'shared': False}
|
|
}
|
|
net = self.plugin.create_network(self.ctx, data)
|
|
policy_id = uuidutils.generate_uuid()
|
|
data['network']['qos_policy_id'] = policy_id
|
|
dummy = mock.Mock()
|
|
dummy.id = policy_id
|
|
with mock.patch.object(self.plugin, '_validate_qos_policy_id'),\
|
|
mock.patch.object(obj_reg.load_class('QosPolicy'),
|
|
'get_network_policy',
|
|
return_value=dummy):
|
|
res = self.plugin.update_network(self.ctx, net['id'], data)
|
|
self.assertEqual(policy_id, res['qos_policy_id'])
|
|
res = self.plugin.get_network(self.ctx, net['id'])
|
|
self.assertEqual(policy_id, res['qos_policy_id'])
|
|
|
|
def test_create_ens_network_with_qos(self):
|
|
cfg.CONF.set_override('ens_support', True, 'nsx_v3')
|
|
mock_ens = mock.patch('vmware_nsxlib.v3.policy'
|
|
'.core_resources.NsxPolicyTransportZoneApi'
|
|
'.get_host_switch_mode', return_value='ENS')
|
|
mock_tz = mock.patch('vmware_nsxlib.v3'
|
|
'.core_resources.NsxLibLogicalSwitch.get',
|
|
return_value={'transport_zone_id': 'xxx'})
|
|
mock_tt = mock.patch('vmware_nsxlib.v3.policy'
|
|
'.core_resources.NsxPolicyTransportZoneApi'
|
|
'.get_transport_type', return_value='VLAN')
|
|
policy_id = uuidutils.generate_uuid()
|
|
data = {'network': {
|
|
'name': 'qos_net',
|
|
'tenant_id': 'some_tenant',
|
|
'provider:network_type': 'flat',
|
|
'provider:physical_network': 'xxx',
|
|
'admin_state_up': True,
|
|
'shared': False,
|
|
'qos_policy_id': policy_id,
|
|
'port_security_enabled': False}}
|
|
with mock_ens, mock_tz, mock_tt, mock.patch.object(
|
|
self.plugin, '_validate_qos_policy_id'):
|
|
res = self.plugin.create_network(context.get_admin_context(), data)
|
|
self.assertEqual(policy_id, res['qos_policy_id'])
|
|
|
|
def test_update_ens_network_with_qos(self):
|
|
cfg.CONF.set_override('ens_support', True, 'nsx_v3')
|
|
mock_ens = mock.patch('vmware_nsxlib.v3.policy'
|
|
'.core_resources.NsxPolicyTransportZoneApi'
|
|
'.get_host_switch_mode', return_value='ENS')
|
|
mock_tz = mock.patch('vmware_nsxlib.v3'
|
|
'.core_resources.NsxLibLogicalSwitch.get',
|
|
return_value={'transport_zone_id': 'xxx'})
|
|
mock_tt = mock.patch('vmware_nsxlib.v3.policy'
|
|
'.core_resources.NsxPolicyTransportZoneApi'
|
|
'.get_transport_type', return_value='VLAN')
|
|
data = {'network': {
|
|
'name': 'qos_net',
|
|
'tenant_id': 'some_tenant',
|
|
'provider:network_type': 'flat',
|
|
'provider:physical_network': 'xxx',
|
|
'admin_state_up': True,
|
|
'shared': False,
|
|
'port_security_enabled': False}}
|
|
with mock_ens, mock_tz, mock_tt,\
|
|
mock.patch.object(self.plugin, '_validate_qos_policy_id'):
|
|
network = self.plugin.create_network(context.get_admin_context(),
|
|
data)
|
|
policy_id = uuidutils.generate_uuid()
|
|
data = {'network': {
|
|
'id': network['id'],
|
|
'admin_state_up': True,
|
|
'shared': False,
|
|
'port_security_enabled': False,
|
|
'tenant_id': 'some_tenant',
|
|
'qos_policy_id': policy_id}}
|
|
res = self.plugin.update_network(
|
|
context.get_admin_context(),
|
|
network['id'], data)
|
|
self.assertEqual(policy_id, res['qos_policy_id'])
|
|
|
|
|
|
class NsxPTestPorts(common_v3.NsxV3TestPorts,
|
|
common_v3.NsxV3SubnetMixin,
|
|
NsxPPluginTestCaseMixin):
|
|
def setUp(self, **kwargs):
|
|
super(NsxPTestPorts, self).setUp(**kwargs)
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_create_port_with_multiple_ipv4_and_ipv6_subnets(self):
|
|
return super(
|
|
NsxPTestPorts,
|
|
self).test_create_port_with_multiple_ipv4_and_ipv6_subnets
|
|
|
|
def test_ip_allocation_for_ipv6_2_subnet_slaac_mode(self):
|
|
self.skipTest('No DHCP v6 Support yet')
|
|
|
|
def test_update_port_with_ipv6_slaac_subnet_in_fixed_ips(self):
|
|
self.skipTest('No DHCP v6 Support yet')
|
|
|
|
def test_update_port_excluding_ipv6_slaac_subnet_from_fixed_ips(self):
|
|
self.skipTest('No DHCP v6 Support yet')
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_requested_subnet_id_v4_and_v6(self):
|
|
return super(NsxPTestPorts, self).test_requested_subnet_id_v4_and_v6()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_requested_ips_only(self):
|
|
return super(NsxPTestPorts, self).test_requested_ips_only()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_list_ports_with_sort_emulated(self):
|
|
return super(NsxPTestPorts,
|
|
self).test_list_ports_with_sort_emulated()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_list_ports_with_pagination_native(self):
|
|
return super(NsxPTestPorts,
|
|
self).test_list_ports_with_pagination_native()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_list_ports_for_network_owner(self):
|
|
return super(NsxPTestPorts, self).test_list_ports_for_network_owner()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_list_ports_public_network(self):
|
|
return super(NsxPTestPorts, self).test_list_ports_public_network()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_list_ports(self):
|
|
return super(NsxPTestPorts, self).test_list_ports()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_get_ports_count(self):
|
|
return super(NsxPTestPorts, self).test_get_ports_count()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_list_ports_with_sort_native(self):
|
|
return super(NsxPTestPorts, self).test_list_ports_with_sort_native()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_list_ports_with_pagination_emulated(self):
|
|
return super(NsxPTestPorts,
|
|
self).test_list_ports_with_pagination_emulated()
|
|
|
|
def test_update_port_delete_ip(self):
|
|
# This test case overrides the default because the nsx plugin
|
|
# implements port_security/security groups and it is not allowed
|
|
# to remove an ip address from a port unless the security group
|
|
# is first removed.
|
|
with self.subnet() as subnet:
|
|
with self.port(subnet=subnet) as port:
|
|
data = {'port': {'admin_state_up': False,
|
|
'fixed_ips': [],
|
|
secgrp.SECURITYGROUPS: []}}
|
|
req = self.new_update_request('ports',
|
|
data, port['port']['id'])
|
|
res = self.deserialize('json', req.get_response(self.api))
|
|
self.assertEqual(res['port']['admin_state_up'],
|
|
data['port']['admin_state_up'])
|
|
self.assertEqual(res['port']['fixed_ips'],
|
|
data['port']['fixed_ips'])
|
|
|
|
def test_create_port_with_qos(self):
|
|
with self.network() as network:
|
|
policy_id = uuidutils.generate_uuid()
|
|
data = {'port': {
|
|
'network_id': network['network']['id'],
|
|
'tenant_id': self._tenant_id,
|
|
'qos_policy_id': policy_id,
|
|
'name': 'qos_port',
|
|
'admin_state_up': True,
|
|
'device_id': 'fake_device',
|
|
'device_owner': 'fake_owner',
|
|
'fixed_ips': [],
|
|
'mac_address': '00:00:00:00:00:01'}
|
|
}
|
|
with mock.patch.object(self.plugin, '_validate_qos_policy_id'):
|
|
port = self.plugin.create_port(self.ctx, data)
|
|
self.assertEqual(policy_id, port['qos_policy_id'])
|
|
# Get port should also return the qos policy id
|
|
with mock.patch('vmware_nsx.services.qos.common.utils.'
|
|
'get_port_policy_id',
|
|
return_value=policy_id):
|
|
port = self.plugin.get_port(self.ctx, port['id'])
|
|
self.assertEqual(policy_id, port['qos_policy_id'])
|
|
|
|
def test_update_port_with_qos(self):
|
|
with self.network() as network:
|
|
data = {'port': {
|
|
'network_id': network['network']['id'],
|
|
'tenant_id': self._tenant_id,
|
|
'name': 'qos_port',
|
|
'admin_state_up': True,
|
|
'device_id': 'fake_device',
|
|
'device_owner': 'fake_owner',
|
|
'fixed_ips': [],
|
|
'mac_address': '00:00:00:00:00:01'}
|
|
}
|
|
port = self.plugin.create_port(self.ctx, data)
|
|
policy_id = uuidutils.generate_uuid()
|
|
data['port']['qos_policy_id'] = policy_id
|
|
with mock.patch.object(self.plugin, '_validate_qos_policy_id'):
|
|
res = self.plugin.update_port(self.ctx, port['id'], data)
|
|
self.assertEqual(policy_id, res['qos_policy_id'])
|
|
# Get port should also return the qos policy id
|
|
with mock.patch('vmware_nsx.services.qos.common.utils.'
|
|
'get_port_policy_id',
|
|
return_value=policy_id):
|
|
res = self.plugin.get_port(self.ctx, port['id'])
|
|
self.assertEqual(policy_id, res['qos_policy_id'])
|
|
|
|
# now remove the qos from the port
|
|
data['port']['qos_policy_id'] = None
|
|
res = self.plugin.update_port(self.ctx, port['id'], data)
|
|
self.assertIsNone(res['qos_policy_id'])
|
|
|
|
def test_create_ext_port_with_qos_fail(self):
|
|
with self._create_l3_ext_network() as network:
|
|
with self.subnet(network=network, cidr='10.0.0.0/24',
|
|
enable_dhcp=False),\
|
|
mock.patch.object(self.plugin, '_validate_qos_policy_id'):
|
|
policy_id = uuidutils.generate_uuid()
|
|
data = {'port': {'network_id': network['network']['id'],
|
|
'tenant_id': self._tenant_id,
|
|
'qos_policy_id': policy_id}}
|
|
# Cannot add qos policy to a router port
|
|
self.assertRaises(n_exc.InvalidInput,
|
|
self.plugin.create_port, self.ctx, data)
|
|
|
|
def _test_create_illegal_port_with_qos_fail(self, device_owner):
|
|
with self.network() as network:
|
|
with self.subnet(network=network, cidr='10.0.0.0/24'),\
|
|
mock.patch.object(self.plugin, '_validate_qos_policy_id'):
|
|
policy_id = uuidutils.generate_uuid()
|
|
data = {'port': {'network_id': network['network']['id'],
|
|
'tenant_id': self._tenant_id,
|
|
'device_owner': device_owner,
|
|
'qos_policy_id': policy_id}}
|
|
# Cannot add qos policy to this type of port
|
|
self.assertRaises(n_exc.InvalidInput,
|
|
self.plugin.create_port, self.ctx, data)
|
|
|
|
def test_create_port_ens_with_qos_fail(self):
|
|
with self.network() as network:
|
|
with self.subnet(network=network, cidr='10.0.0.0/24'):
|
|
policy_id = uuidutils.generate_uuid()
|
|
mock_ens = mock.patch(
|
|
'vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicyTransportZoneApi.get_host_switch_mode',
|
|
return_value='ENS')
|
|
mock_tz = mock.patch(
|
|
'vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get',
|
|
return_value={'transport_zone_id': 'xxx'})
|
|
mock_tt = mock.patch(
|
|
'vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicyTransportZoneApi.get_transport_type',
|
|
return_value='VLAN')
|
|
data = {'port': {
|
|
'network_id': network['network']['id'],
|
|
'tenant_id': self._tenant_id,
|
|
'name': 'qos_port',
|
|
'admin_state_up': True,
|
|
'device_id': 'fake_device',
|
|
'device_owner': 'fake_owner',
|
|
'fixed_ips': [],
|
|
'port_security_enabled': False,
|
|
'mac_address': '00:00:00:00:00:01',
|
|
'qos_policy_id': policy_id}
|
|
}
|
|
# Cannot add qos policy to this type of port
|
|
with mock_ens, mock_tz, mock_tt, \
|
|
mock.patch.object(self.plugin, '_validate_qos_policy_id'):
|
|
res = self.plugin.create_port(self.ctx, data)
|
|
self.assertEqual(policy_id, res['qos_policy_id'])
|
|
|
|
def test_create_port_with_mac_learning_true(self):
|
|
plugin = directory.get_plugin()
|
|
ctx = context.get_admin_context()
|
|
with self.network() as network:
|
|
data = {'port': {
|
|
'network_id': network['network']['id'],
|
|
'tenant_id': self._tenant_id,
|
|
'name': 'port',
|
|
'admin_state_up': True,
|
|
'device_id': 'fake_device',
|
|
'device_owner': 'fake_owner',
|
|
'fixed_ips': [],
|
|
'port_security_enabled': False,
|
|
'mac_address': '00:00:00:00:00:01',
|
|
'mac_learning_enabled': True}
|
|
}
|
|
port = plugin.create_port(ctx, data)
|
|
self.assertTrue(port['mac_learning_enabled'])
|
|
|
|
def test_create_port_with_mac_learning_false(self):
|
|
plugin = directory.get_plugin()
|
|
ctx = context.get_admin_context()
|
|
with self.network() as network:
|
|
data = {'port': {
|
|
'network_id': network['network']['id'],
|
|
'tenant_id': self._tenant_id,
|
|
'name': 'port',
|
|
'admin_state_up': True,
|
|
'device_id': 'fake_device',
|
|
'device_owner': 'fake_owner',
|
|
'fixed_ips': [],
|
|
'port_security_enabled': False,
|
|
'mac_address': '00:00:00:00:00:01',
|
|
'mac_learning_enabled': False}
|
|
}
|
|
port = plugin.create_port(ctx, data)
|
|
self.assertFalse(port['mac_learning_enabled'])
|
|
|
|
def test_update_port_with_mac_learning_true(self):
|
|
plugin = directory.get_plugin()
|
|
ctx = context.get_admin_context()
|
|
with self.network() as network:
|
|
data = {'port': {
|
|
'network_id': network['network']['id'],
|
|
'tenant_id': self._tenant_id,
|
|
'name': 'port',
|
|
'admin_state_up': True,
|
|
'device_id': 'fake_device',
|
|
'device_owner': 'fake_owner',
|
|
'fixed_ips': [],
|
|
'port_security_enabled': False,
|
|
'mac_address': '00:00:00:00:00:01'}
|
|
}
|
|
port = plugin.create_port(ctx, data)
|
|
data['port']['mac_learning_enabled'] = True
|
|
update_res = plugin.update_port(ctx, port['id'], data)
|
|
self.assertTrue(update_res['mac_learning_enabled'])
|
|
|
|
def test_update_port_with_mac_learning_false(self):
|
|
plugin = directory.get_plugin()
|
|
ctx = context.get_admin_context()
|
|
with self.network() as network:
|
|
data = {'port': {
|
|
'network_id': network['network']['id'],
|
|
'tenant_id': self._tenant_id,
|
|
'name': 'port',
|
|
'admin_state_up': True,
|
|
'device_id': 'fake_device',
|
|
'device_owner': 'fake_owner',
|
|
'fixed_ips': [],
|
|
'port_security_enabled': False,
|
|
'mac_address': '00:00:00:00:00:01'}
|
|
}
|
|
port = plugin.create_port(ctx, data)
|
|
data['port']['mac_learning_enabled'] = False
|
|
update_res = plugin.update_port(ctx, port['id'], data)
|
|
self.assertFalse(update_res['mac_learning_enabled'])
|
|
|
|
def test_update_port_with_mac_learning_failes(self):
|
|
plugin = directory.get_plugin()
|
|
ctx = context.get_admin_context()
|
|
with self.network() as network:
|
|
data = {'port': {
|
|
'network_id': network['network']['id'],
|
|
'tenant_id': self._tenant_id,
|
|
'name': 'port',
|
|
'admin_state_up': True,
|
|
'device_id': 'fake_device',
|
|
'device_owner': constants.DEVICE_OWNER_FLOATINGIP,
|
|
'fixed_ips': [],
|
|
'port_security_enabled': False,
|
|
'mac_address': '00:00:00:00:00:01'}
|
|
}
|
|
port = plugin.create_port(ctx, data)
|
|
data['port']['mac_learning_enabled'] = True
|
|
self.assertRaises(
|
|
n_exc.InvalidInput,
|
|
plugin.update_port, ctx, port['id'], data)
|
|
|
|
def _create_l3_ext_network(
|
|
self, physical_network=DEFAULT_TIER0_ROUTER_UUID):
|
|
name = 'l3_ext_net'
|
|
net_type = utils.NetworkTypes.L3_EXT
|
|
providernet_args = {pnet.NETWORK_TYPE: net_type,
|
|
pnet.PHYSICAL_NETWORK: physical_network}
|
|
return self.network(name=name,
|
|
router__external=True,
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE,
|
|
pnet.PHYSICAL_NETWORK))
|
|
|
|
def test_fail_create_port_with_ext_net(self):
|
|
expected_error = 'InvalidInput'
|
|
with self._create_l3_ext_network() as network:
|
|
with self.subnet(network=network, cidr='10.0.0.0/24',
|
|
enable_dhcp=False):
|
|
device_owner = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'X'
|
|
res = self._create_port(self.fmt,
|
|
network['network']['id'],
|
|
exc.HTTPBadRequest.code,
|
|
device_owner=device_owner)
|
|
data = self.deserialize(self.fmt, res)
|
|
self.assertEqual(expected_error, data['NeutronError']['type'])
|
|
|
|
def test_fail_update_port_with_ext_net(self):
|
|
with self._create_l3_ext_network() as network:
|
|
with self.subnet(network=network, cidr='10.0.0.0/24',
|
|
enable_dhcp=False) as subnet:
|
|
with self.port(subnet=subnet) as port:
|
|
device_owner = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'X'
|
|
data = {'port': {'device_owner': device_owner}}
|
|
req = self.new_update_request('ports',
|
|
data, port['port']['id'])
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(exc.HTTPBadRequest.code,
|
|
res.status_int)
|
|
|
|
def _test_create_direct_network(self, vlan_id=0):
|
|
net_type = vlan_id and 'vlan' or 'flat'
|
|
name = 'direct_net'
|
|
providernet_args = {pnet.NETWORK_TYPE: net_type,
|
|
pnet.PHYSICAL_NETWORK: 'tzuuid'}
|
|
if vlan_id:
|
|
providernet_args[pnet.SEGMENTATION_ID] = vlan_id
|
|
|
|
mock_tt = mock.patch('vmware_nsxlib.v3.policy'
|
|
'.core_resources.NsxPolicyTransportZoneApi'
|
|
'.get_transport_type',
|
|
return_value=nsx_constants.TRANSPORT_TYPE_VLAN)
|
|
mock_tt.start()
|
|
return self.network(name=name,
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE,
|
|
pnet.PHYSICAL_NETWORK,
|
|
pnet.SEGMENTATION_ID))
|
|
|
|
def _test_create_port_vnic_direct(self, vlan_id):
|
|
with mock.patch('vmware_nsxlib.v3.policy.core_resources.'
|
|
'NsxPolicyTransportZoneApi.get_transport_type',
|
|
return_value=nsx_constants.TRANSPORT_TYPE_VLAN),\
|
|
self._test_create_direct_network(vlan_id=vlan_id) as network:
|
|
# Check that port security conflicts
|
|
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
|
|
psec.PORTSECURITY: True}
|
|
net_id = network['network']['id']
|
|
res = self._create_port(self.fmt, net_id=net_id,
|
|
arg_list=(portbindings.VNIC_TYPE,
|
|
psec.PORTSECURITY),
|
|
**kwargs)
|
|
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
|
|
|
# Check that security group conflicts
|
|
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
|
|
'security_groups': [
|
|
'4cd70774-cc67-4a87-9b39-7d1db38eb087'],
|
|
psec.PORTSECURITY: False}
|
|
net_id = network['network']['id']
|
|
res = self._create_port(self.fmt, net_id=net_id,
|
|
arg_list=(portbindings.VNIC_TYPE,
|
|
psec.PORTSECURITY),
|
|
**kwargs)
|
|
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
|
|
|
# All is kosher so we can create the port
|
|
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT}
|
|
net_id = network['network']['id']
|
|
res = self._create_port(self.fmt, net_id=net_id,
|
|
arg_list=(portbindings.VNIC_TYPE,),
|
|
**kwargs)
|
|
port = self.deserialize('json', res)
|
|
self.assertEqual("direct", port['port'][portbindings.VNIC_TYPE])
|
|
self.assertEqual("dvs", port['port'][portbindings.VIF_TYPE])
|
|
self.assertEqual(
|
|
vlan_id,
|
|
port['port'][portbindings.VIF_DETAILS]['segmentation-id'])
|
|
|
|
# try to get the same port
|
|
req = self.new_show_request('ports', port['port']['id'], self.fmt)
|
|
sport = self.deserialize(self.fmt, req.get_response(self.api))
|
|
self.assertEqual("dvs", sport['port'][portbindings.VIF_TYPE])
|
|
self.assertEqual("direct", sport['port'][portbindings.VNIC_TYPE])
|
|
self.assertEqual(
|
|
vlan_id,
|
|
sport['port'][portbindings.VIF_DETAILS]['segmentation-id'])
|
|
|
|
def test_create_port_vnic_direct_flat(self):
|
|
self._test_create_port_vnic_direct(0)
|
|
|
|
def test_create_port_vnic_direct_vlan(self):
|
|
self._test_create_port_vnic_direct(10)
|
|
|
|
def test_create_port_vnic_direct_invalid_network(self):
|
|
with self.network(name='not vlan/flat') as net:
|
|
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
|
|
psec.PORTSECURITY: False}
|
|
net_id = net['network']['id']
|
|
res = self._create_port(self.fmt, net_id=net_id,
|
|
arg_list=(portbindings.VNIC_TYPE,
|
|
psec.PORTSECURITY),
|
|
**kwargs)
|
|
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
|
|
|
|
def test_update_vnic_direct(self):
|
|
with self._test_create_direct_network(vlan_id=7) as network:
|
|
with self.subnet(network=network) as subnet:
|
|
with self.port(subnet=subnet) as port:
|
|
# need to do two updates as the update for port security
|
|
# disabled requires that it can only change 2 items
|
|
data = {'port': {psec.PORTSECURITY: False,
|
|
'security_groups': []}}
|
|
req = self.new_update_request('ports',
|
|
data, port['port']['id'])
|
|
res = self.deserialize('json', req.get_response(self.api))
|
|
self.assertEqual(portbindings.VNIC_NORMAL,
|
|
res['port'][portbindings.VNIC_TYPE])
|
|
|
|
data = {'port': {portbindings.VNIC_TYPE:
|
|
portbindings.VNIC_DIRECT}}
|
|
|
|
req = self.new_update_request('ports',
|
|
data, port['port']['id'])
|
|
res = self.deserialize('json', req.get_response(self.api))
|
|
self.assertEqual(portbindings.VNIC_DIRECT,
|
|
res['port'][portbindings.VNIC_TYPE])
|
|
|
|
def test_port_invalid_vnic_type(self):
|
|
with self._test_create_direct_network(vlan_id=7) as network:
|
|
kwargs = {portbindings.VNIC_TYPE: 'invalid',
|
|
psec.PORTSECURITY: False}
|
|
net_id = network['network']['id']
|
|
res = self._create_port(self.fmt, net_id=net_id,
|
|
arg_list=(portbindings.VNIC_TYPE,
|
|
psec.PORTSECURITY),
|
|
**kwargs)
|
|
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
|
|
|
|
|
|
class NsxPTestSubnets(common_v3.NsxV3TestSubnets,
|
|
NsxPPluginTestCaseMixin):
|
|
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
|
|
super(NsxPTestSubnets, self).setUp(plugin=plugin, ext_mgr=ext_mgr)
|
|
self.force_slaac = False
|
|
|
|
def _create_subnet_bulk(self, fmt, number, net_id, name,
|
|
ip_version=4, **kwargs):
|
|
base_data = {'subnet': {'network_id': net_id,
|
|
'ip_version': ip_version,
|
|
'enable_dhcp': False,
|
|
'tenant_id': self._tenant_id}}
|
|
if 'ipv6_mode' in kwargs:
|
|
base_data['subnet']['ipv6_ra_mode'] = kwargs['ipv6_mode']
|
|
base_data['subnet']['ipv6_address_mode'] = kwargs['ipv6_mode']
|
|
# auto-generate cidrs as they should not overlap
|
|
base_cidr = "10.0.%s.0/24"
|
|
if ip_version == constants.IP_VERSION_6:
|
|
base_cidr = "fd%s::/64"
|
|
|
|
# auto-generate cidrs as they should not overlap
|
|
overrides = dict((k, v)
|
|
for (k, v) in zip(range(number),
|
|
[{'cidr': base_cidr % num}
|
|
for num in range(number)]))
|
|
kwargs.update({'override': overrides})
|
|
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
|
|
|
|
def _test_create_subnet(self, network=None, expected=None, **kwargs):
|
|
# Until DHCPv6 is supported, switch all test to slaac-only
|
|
if (self.force_slaac and
|
|
'ipv6_ra_mode' in kwargs and 'ipv6_address_mode' in kwargs):
|
|
kwargs['ipv6_ra_mode'] = constants.IPV6_SLAAC
|
|
kwargs['ipv6_address_mode'] = constants.IPV6_SLAAC
|
|
|
|
return super(NsxPTestSubnets,
|
|
self)._test_create_subnet(network, expected, **kwargs)
|
|
|
|
def test_create_external_subnet_with_conflicting_t0_address(self):
|
|
with self._create_l3_ext_network() as network:
|
|
data = {'subnet': {'network_id': network['network']['id'],
|
|
'cidr': '172.20.1.0/24',
|
|
'name': 'sub1',
|
|
'enable_dhcp': False,
|
|
'dns_nameservers': None,
|
|
'allocation_pools': None,
|
|
'tenant_id': 'tenant_one',
|
|
'host_routes': None,
|
|
'ip_version': 4}}
|
|
with mock.patch.object(self.plugin.nsxpolicy.tier0,
|
|
'get_uplink_cidrs',
|
|
return_value=['172.20.1.60/24']):
|
|
self.assertRaises(n_exc.InvalidInput,
|
|
self.plugin.create_subnet,
|
|
context.get_admin_context(), data)
|
|
|
|
def test_create_external_subnet_with_non_conflicting_t0_address(self):
|
|
with self._create_l3_ext_network() as network:
|
|
data = {'subnet': {'network_id': network['network']['id'],
|
|
'cidr': '172.20.1.0/24',
|
|
'name': 'sub1',
|
|
'enable_dhcp': False,
|
|
'dns_nameservers': None,
|
|
'allocation_pools': None,
|
|
'tenant_id': 'tenant_one',
|
|
'host_routes': None,
|
|
'ip_version': 4}}
|
|
with mock.patch.object(self.plugin.nsxpolicy.tier0,
|
|
'get_uplink_ips',
|
|
return_value=['172.20.2.60']):
|
|
self.plugin.create_subnet(
|
|
context.get_admin_context(), data)
|
|
|
|
@common_v3.with_disable_dhcp_once
|
|
def test_create_subnet_ipv6_slaac_with_port_on_network(self):
|
|
super(NsxPTestSubnets,
|
|
self).test_create_subnet_ipv6_slaac_with_port_on_network()
|
|
|
|
|
|
class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest,
|
|
NsxPPluginTestCaseMixin,
|
|
test_securitygroup.TestSecurityGroups,
|
|
test_securitygroup.SecurityGroupDBTestCase):
|
|
|
|
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
|
|
super(NsxPTestSecurityGroup, self).setUp(plugin=plugin,
|
|
ext_mgr=ext_mgr)
|
|
self.project_id = test_db_base_plugin_v2.TEST_TENANT_ID
|
|
# add provider group attributes
|
|
secgrp.Securitygroup().update_attributes_map(
|
|
provider_sg.EXTENDED_ATTRIBUTES_2_0)
|
|
|
|
def test_create_security_group_rule_icmp_with_type_and_code(self):
|
|
"""No non-zero icmp codes are currently supported by the NSX"""
|
|
self.skipTest('not supported')
|
|
|
|
def test_create_security_group_rule_icmp_with_type(self):
|
|
name = 'webservers'
|
|
description = 'my webservers'
|
|
with self.security_group(name, description) as sg:
|
|
security_group_id = sg['security_group']['id']
|
|
direction = "ingress"
|
|
remote_ip_prefix = "10.0.0.0/24"
|
|
protocol = "icmp"
|
|
# port_range_min (ICMP type) is greater than port_range_max
|
|
# (ICMP code) in order to confirm min <= max port check is
|
|
# not called for ICMP.
|
|
port_range_min = 14
|
|
port_range_max = None
|
|
keys = [('remote_ip_prefix', remote_ip_prefix),
|
|
('security_group_id', security_group_id),
|
|
('direction', direction),
|
|
('protocol', protocol),
|
|
('port_range_min', port_range_min),
|
|
('port_range_max', port_range_max)]
|
|
with self.security_group_rule(security_group_id, direction,
|
|
protocol, port_range_min,
|
|
port_range_max,
|
|
remote_ip_prefix) as rule:
|
|
for k, v, in keys:
|
|
self.assertEqual(rule['security_group_rule'][k], v)
|
|
|
|
@common_v3.with_no_dhcp_subnet
|
|
def test_list_ports_security_group(self):
|
|
return super(NsxPTestSecurityGroup,
|
|
self).test_list_ports_security_group()
|
|
|
|
@mock.patch.object(nsx_plugin.NsxPolicyPlugin, 'get_security_group')
|
|
def test_create_security_group_rule_with_invalid_tcp_or_udp_protocol(
|
|
self, get_mock):
|
|
super(NsxPTestSecurityGroup, self).\
|
|
test_create_security_group_rule_with_invalid_tcp_or_udp_protocol()
|
|
|
|
@mock.patch.object(nsx_plugin.NsxPolicyPlugin, 'get_security_group')
|
|
def test_create_security_group_source_group_ip_and_ip_prefix(
|
|
self, get_mock):
|
|
super(NsxPTestSecurityGroup, self).\
|
|
test_create_security_group_source_group_ip_and_ip_prefix()
|
|
|
|
def _create_default_sg(self):
|
|
self.plugin._ensure_default_security_group(
|
|
context.get_admin_context(), self.project_id)
|
|
|
|
def test_sg_create_on_nsx(self):
|
|
"""Verify that a group and comm-map are created for a new SG"""
|
|
# Make sure the default SG is created before testing
|
|
self._create_default_sg()
|
|
name = description = 'sg1'
|
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyGroupApi.create_or_overwrite_with_conditions"
|
|
) as group_create,\
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyCommunicationMapApi."
|
|
"create_with_entries") as comm_map_create,\
|
|
self.security_group(name, description) as sg:
|
|
sg_id = sg['security_group']['id']
|
|
nsx_name = utils.get_name_and_uuid(name, sg_id)
|
|
group_create.assert_called_once_with(
|
|
nsx_name, policy_constants.DEFAULT_DOMAIN, group_id=sg_id,
|
|
description=description,
|
|
conditions=[mock.ANY], tags=mock.ANY)
|
|
comm_map_create.assert_called_once_with(
|
|
nsx_name, policy_constants.DEFAULT_DOMAIN, map_id=sg_id,
|
|
description=description,
|
|
tags=mock.ANY,
|
|
entries=mock.ANY,
|
|
category=policy_constants.CATEGORY_ENVIRONMENT)
|
|
|
|
def _create_provider_security_group(self):
|
|
body = {'security_group': {'name': 'provider-deny',
|
|
'tenant_id': self._tenant_id,
|
|
'description': 'provider sg',
|
|
'provider': True}}
|
|
security_group_req = self.new_create_request('security-groups', body)
|
|
return self.deserialize(self.fmt,
|
|
security_group_req.get_response(self.ext_api))
|
|
|
|
def test_provider_sg_on_port(self):
|
|
psg = self._create_provider_security_group()
|
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicySegmentPortApi.create_or_overwrite"
|
|
) as port_create:
|
|
with self.port(tenant_id=self._tenant_id) as port:
|
|
# make sure the port has the provider sg
|
|
port_data = port['port']
|
|
self.assertEqual(1, len(port_data['provider_security_groups']))
|
|
self.assertEqual(psg['security_group']['id'],
|
|
port_data['provider_security_groups'][0])
|
|
|
|
# Make sure the correct security groups tags were set
|
|
port_create.assert_called_once()
|
|
actual_tags = port_create.call_args[1]['tags']
|
|
sg_tags = 0
|
|
psg_tag_found = False
|
|
for tag in actual_tags:
|
|
if tag['scope'] == 'os-security-group':
|
|
sg_tags += 1
|
|
if tag['tag'] == psg['security_group']['id']:
|
|
psg_tag_found = True
|
|
self.assertEqual(2, sg_tags)
|
|
self.assertTrue(psg_tag_found)
|
|
|
|
def test_remove_provider_sg_from_port(self):
|
|
psg = self._create_provider_security_group()
|
|
with self.port(tenant_id=self._tenant_id) as port:
|
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicySegmentPortApi.create_or_overwrite"
|
|
) as port_update:
|
|
# specifically remove the provider sg from the port
|
|
data = {'port': {'provider_security_groups': []}}
|
|
req = self.new_update_request('ports',
|
|
data, port['port']['id'])
|
|
res = self.deserialize('json', req.get_response(self.api))
|
|
self.assertEqual(0,
|
|
len(res['port']['provider_security_groups']))
|
|
# Make sure the correct security groups tags were set
|
|
port_update.assert_called_once()
|
|
actual_tags = port_update.call_args[1]['tags']
|
|
sg_tags = 0
|
|
psg_tag_found = False
|
|
for tag in actual_tags:
|
|
if tag['scope'] == 'os-security-group':
|
|
sg_tags += 1
|
|
if tag['tag'] == psg['security_group']['id']:
|
|
psg_tag_found = True
|
|
self.assertEqual(1, sg_tags)
|
|
self.assertFalse(psg_tag_found)
|
|
|
|
def test_sg_rule_create_on_nsx(self):
|
|
"""Verify that a comm-map entry is created for a new SG rule """
|
|
name = description = 'sg1'
|
|
direction = "ingress"
|
|
remote_ip_prefix = "10.0.0.0/24"
|
|
protocol = "tcp"
|
|
port_range_min = 80
|
|
port_range_max = 80
|
|
with self.security_group(name, description) as sg:
|
|
sg_id = sg['security_group']['id']
|
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyCommunicationMapApi.create_entry"
|
|
) as entry_create,\
|
|
self.security_group_rule(sg_id, direction,
|
|
protocol, port_range_min,
|
|
port_range_max,
|
|
remote_ip_prefix) as rule:
|
|
rule_id = rule['security_group_rule']['id']
|
|
scope = [self.plugin.nsxpolicy.group.get_path(
|
|
policy_constants.DEFAULT_DOMAIN, sg_id)]
|
|
entry_create.assert_called_once_with(
|
|
rule_id, policy_constants.DEFAULT_DOMAIN,
|
|
sg_id, entry_id=rule_id,
|
|
description='',
|
|
direction=nsx_constants.IN,
|
|
ip_protocol=nsx_constants.IPV4,
|
|
action=policy_constants.ACTION_ALLOW,
|
|
service_ids=mock.ANY,
|
|
source_groups=mock.ANY,
|
|
dest_groups=mock.ANY,
|
|
scope=scope,
|
|
logged=False,
|
|
tag=mock.ANY)
|
|
|
|
def test_create_security_group_rule_with_remote_group(self):
|
|
with self.security_group() as sg1, self.security_group() as sg2:
|
|
security_group_id = sg1['security_group']['id']
|
|
direction = "ingress"
|
|
remote_group_id = sg2['security_group']['id']
|
|
protocol = "tcp"
|
|
keys = [('remote_group_id', remote_group_id),
|
|
('security_group_id', security_group_id),
|
|
('direction', direction),
|
|
('protocol', protocol)]
|
|
with self.security_group_rule(
|
|
security_group_id, direction=direction, protocol=protocol,
|
|
remote_group_id=remote_group_id) as rule:
|
|
for k, v, in keys:
|
|
self.assertEqual(rule['security_group_rule'][k], v)
|
|
|
|
def test_delete_security_group_rule_with_remote_group(self):
|
|
com_plugin.subscribe()
|
|
with self.security_group() as sg1, self.security_group() as sg2:
|
|
security_group_id = sg1['security_group']['id']
|
|
direction = "ingress"
|
|
remote_group_id = sg2['security_group']['id']
|
|
protocol = "tcp"
|
|
with self.security_group_rule(
|
|
security_group_id, direction=direction, protocol=protocol,
|
|
remote_group_id=remote_group_id) as rule,\
|
|
mock.patch.object(
|
|
self.plugin, "delete_security_group_rule") as del_rule:
|
|
# delete sg2
|
|
self._delete('security-groups', remote_group_id,
|
|
exc.HTTPNoContent.code)
|
|
# verify the rule was deleted
|
|
del_rule.assert_called_once_with(
|
|
mock.ANY, rule["security_group_rule"]["id"])
|
|
|
|
|
|
class NsxPTestL3ExtensionManager(object):
|
|
|
|
def get_resources(self):
|
|
# Simulate extension of L3 attribute map
|
|
l3.L3().update_attributes_map(
|
|
l3_egm_apidef.RESOURCE_ATTRIBUTE_MAP)
|
|
l3.L3().update_attributes_map(
|
|
xroute_apidef.RESOURCE_ATTRIBUTE_MAP)
|
|
return (l3.L3.get_resources() +
|
|
address_scope.Address_scope.get_resources())
|
|
|
|
def get_actions(self):
|
|
return []
|
|
|
|
def get_request_extensions(self):
|
|
return []
|
|
|
|
|
|
class NsxPTestL3NatTest(common_v3.FixExternalNetBaseTest,
|
|
common_v3.NsxV3SubnetMixin,
|
|
NsxPPluginTestCaseMixin,
|
|
test_l3_plugin.L3BaseForIntTests,
|
|
test_address_scope.AddressScopeTestCase):
|
|
|
|
def setUp(self, *args, **kwargs):
|
|
cfg.CONF.set_override('api_extensions_path', vmware.NSXEXT_PATH)
|
|
cfg.CONF.set_default('max_routes', 3)
|
|
kwargs['ext_mgr'] = (kwargs.get('ext_mgr') or
|
|
NsxPTestL3ExtensionManager())
|
|
|
|
# Make sure the LB callback is not called on router deletion
|
|
self.lb_mock1 = mock.patch(
|
|
"vmware_nsx.services.lbaas.octavia.octavia_listener."
|
|
"NSXOctaviaListenerEndpoint._check_lb_service_on_router")
|
|
self.lb_mock1.start()
|
|
self.lb_mock2 = mock.patch(
|
|
"vmware_nsx.services.lbaas.octavia.octavia_listener."
|
|
"NSXOctaviaListenerEndpoint._check_lb_service_on_router_interface")
|
|
self.lb_mock2.start()
|
|
|
|
super(NsxPTestL3NatTest, self).setUp(*args, **kwargs)
|
|
self.original_subnet = self.subnet
|
|
self.original_network = self.network
|
|
|
|
self.plugin_instance = directory.get_plugin()
|
|
self._plugin_name = "%s.%s" % (
|
|
self.plugin_instance.__module__,
|
|
self.plugin_instance.__class__.__name__)
|
|
self._plugin_class = self.plugin_instance.__class__
|
|
|
|
def external_network(self, name='net1',
|
|
admin_state_up=True,
|
|
fmt=None, **kwargs):
|
|
if not name:
|
|
name = 'l3_ext_net'
|
|
physical_network = 'abc'
|
|
net_type = utils.NetworkTypes.L3_EXT
|
|
providernet_args = {pnet.NETWORK_TYPE: net_type,
|
|
pnet.PHYSICAL_NETWORK: physical_network}
|
|
return self.original_network(name=name,
|
|
admin_state_up=admin_state_up,
|
|
fmt=fmt,
|
|
router__external=True,
|
|
providernet_args=providernet_args,
|
|
arg_list=(pnet.NETWORK_TYPE,
|
|
pnet.PHYSICAL_NETWORK))
|
|
|
|
def test_floatingip_create_different_fixed_ip_same_port(self):
|
|
self.skipTest('Multiple fixed ips on a port are not supported')
|
|
|
|
def test_router_add_interface_multiple_ipv4_subnet_port_returns_400(self):
|
|
self.skipTest('Multiple fixed ips on a port are not supported')
|
|
|
|
def test_router_add_interface_multiple_ipv6_subnet_port(self):
|
|
self.skipTest('Multiple fixed ips on a port are not supported')
|
|
|
|
def test_floatingip_update_different_fixed_ip_same_port(self):
|
|
self.skipTest('Multiple fixed ips on a port are not supported')
|
|
|
|
def test_create_multiple_floatingips_same_fixed_ip_same_port(self):
|
|
self.skipTest('Multiple fixed ips on a port are not supported')
|
|
|
|
|
|
class NsxPTestL3NatTestCase(NsxPTestL3NatTest,
|
|
test_l3_plugin.L3NatDBIntTestCase,
|
|
test_ext_route.ExtraRouteDBTestCaseBase):
|
|
|
|
def setUp(self, *args, **kwargs):
|
|
super(NsxPTestL3NatTestCase, self).setUp(*args, **kwargs)
|
|
|
|
mock.patch.object(self.plugin.nsxpolicy, 'search_by_tags',
|
|
return_value={'results': []}).start()
|
|
|
|
def test__notify_gateway_port_ip_changed(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test__notify_gateway_port_ip_not_changed(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_floatingip_via_router_interface_returns_201(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_floatingip_via_router_interface_returns_404(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_network_update_external(self):
|
|
# This plugin does not support updating the external flag of a network
|
|
self.skipTest('not supported')
|
|
|
|
def test_network_update_external_failure(self):
|
|
# This plugin does not support updating the external flag of a network
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_add_gateway_dup_subnet1_returns_400(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_add_interface_dup_subnet2_returns_400(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_add_interface_ipv6_port_existing_network_returns_400(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_routes_update_for_multiple_routers(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_floatingip_multi_external_one_internal(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_floatingip_same_external_and_internal(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_route_update_with_external_route(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_floatingip_update_subnet_gateway_disabled(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_floatingip_update_to_same_port_id_twice(self):
|
|
self.skipTest('Plugin changes floating port status')
|
|
|
|
def test_router_add_interface_by_port_other_tenant_address_out_of_pool(
|
|
self):
|
|
# multiple fixed ips per port are not supported
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_add_interface_by_port_other_tenant_address_in_pool(self):
|
|
# multiple fixed ips per port are not supported
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_add_interface_by_port_admin_address_out_of_pool(self):
|
|
# multiple fixed ips per port are not supported
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_add_gateway_no_subnet(self):
|
|
self.skipTest('No support for no subnet gateway set')
|
|
|
|
def test_router_remove_ipv6_subnet_from_interface(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_add_interface_multiple_ipv6_subnets_same_net(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_add_interface_multiple_ipv4_subnets(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_update_gateway_upon_subnet_create_max_ips_ipv6(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_delete_dhcpv6_stateless_subnet_inuse_returns_409(self):
|
|
self.skipTest('not supported')
|
|
|
|
@common_v3.with_disable_dhcp
|
|
@common_v3.with_external_network
|
|
def test_router_update_gateway_upon_subnet_create_ipv6(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_update_gateway_upon_subnet_create_ipv6()
|
|
|
|
def test_router_delete_ipv6_slaac_subnet_inuse_returns_409(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_add_gateway_multiple_subnets_ipv6(self):
|
|
self.skipTest('not supported')
|
|
|
|
def test_router_add_interface_ipv6_subnet(self):
|
|
self.skipTest('DHCPv6 not supported')
|
|
|
|
def test_slaac_profile_single_subnet(self):
|
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyTier1Api.update") as t1_update:
|
|
|
|
with self.router() as r, self.network() as n:
|
|
with self.subnet(network=n, cidr='fd00::0/64',
|
|
gateway_ip='fd00::1', ip_version=6,
|
|
ipv6_address_mode='slaac',
|
|
ipv6_ra_mode='slaac') as s:
|
|
|
|
self._router_interface_action('add',
|
|
r['router']['id'],
|
|
s['subnet']['id'],
|
|
None)
|
|
# Validate T1 was updated with slaac profile
|
|
t1_update.assert_called_with(
|
|
r['router']['id'],
|
|
ipv6_ndra_profile_id='neutron-slaac-profile')
|
|
|
|
self._router_interface_action('remove',
|
|
r['router']['id'],
|
|
s['subnet']['id'],
|
|
None)
|
|
# Validate T1 was updated with default profile
|
|
t1_update.assert_called_with(
|
|
r['router']['id'],
|
|
ipv6_ndra_profile_id='neutron-no-slaac-profile')
|
|
|
|
def test_slaac_profile_dual_stack(self):
|
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyTier1Api.update") as t1_update:
|
|
|
|
with self.router() as r,\
|
|
self.network() as n:
|
|
with self.subnet(network=n, cidr='2.3.3.0/24') as s1,\
|
|
self.subnet(network=n, cidr='fd10::0/64',
|
|
gateway_ip='fd10::1', ip_version=6,
|
|
ipv6_address_mode='slaac',
|
|
ipv6_ra_mode='slaac') as s2:
|
|
|
|
self._router_interface_action('add',
|
|
r['router']['id'],
|
|
s1['subnet']['id'],
|
|
None)
|
|
self._router_interface_action('add',
|
|
r['router']['id'],
|
|
s2['subnet']['id'],
|
|
None)
|
|
# Validate T1 was updated with slaac profile
|
|
t1_update.assert_called_with(
|
|
r['router']['id'],
|
|
ipv6_ndra_profile_id='neutron-slaac-profile')
|
|
|
|
# Remove non-slaac subnets first
|
|
self._router_interface_action('remove',
|
|
r['router']['id'],
|
|
s1['subnet']['id'],
|
|
None)
|
|
self._router_interface_action('remove',
|
|
r['router']['id'],
|
|
s2['subnet']['id'],
|
|
None)
|
|
|
|
# Validate T1 was updated with default profile
|
|
t1_update.assert_called_with(
|
|
r['router']['id'],
|
|
ipv6_ndra_profile_id='neutron-no-slaac-profile')
|
|
|
|
self._delete('subnets', s1['subnet']['id'])
|
|
self._delete('subnets', s2['subnet']['id'])
|
|
|
|
def test_slaac_profile_multi_net(self):
|
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyTier1Api.update") as t1_update:
|
|
|
|
with self.router() as r,\
|
|
self.network() as n1, self.network() as n2:
|
|
with self.subnet(network=n1, cidr='fd00::0/64',
|
|
gateway_ip='fd00::1', ip_version=6,
|
|
enable_dhcp=False) as s1,\
|
|
self.subnet(network=n2, cidr='fd10::0/64',
|
|
gateway_ip='fd10::1', ip_version=6,
|
|
ipv6_address_mode='slaac',
|
|
ipv6_ra_mode='slaac') as s2,\
|
|
self.subnet(network=n2, cidr='2.3.3.0/24',
|
|
gateway_ip='2.3.3.1') as s3:
|
|
|
|
# Add three subnets to the router, with slaac-enabled one
|
|
# in the middle
|
|
self._router_interface_action('add',
|
|
r['router']['id'],
|
|
s1['subnet']['id'],
|
|
None)
|
|
self._router_interface_action('add',
|
|
r['router']['id'],
|
|
s2['subnet']['id'],
|
|
None)
|
|
self._router_interface_action('add',
|
|
r['router']['id'],
|
|
s3['subnet']['id'],
|
|
None)
|
|
# Validate T1 was updated with slaac profile
|
|
t1_update.assert_called_with(
|
|
r['router']['id'],
|
|
ipv6_ndra_profile_id='neutron-slaac-profile')
|
|
|
|
# Remove non-slaac subnets first
|
|
self._router_interface_action('remove',
|
|
r['router']['id'],
|
|
s1['subnet']['id'],
|
|
None)
|
|
self._router_interface_action('remove',
|
|
r['router']['id'],
|
|
s3['subnet']['id'],
|
|
None)
|
|
self._router_interface_action('remove',
|
|
r['router']['id'],
|
|
s2['subnet']['id'],
|
|
None)
|
|
# Validate T1 was updated with default profile
|
|
t1_update.assert_called_with(
|
|
r['router']['id'],
|
|
ipv6_ndra_profile_id='neutron-no-slaac-profile')
|
|
|
|
def _test_router_add_dual_stack_subnets(self, s6_first=False):
|
|
"""Add dual stack subnets to router"""
|
|
|
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicySegmentApi.update") as seg_update:
|
|
|
|
with self.router() as r, self.network() as n:
|
|
with self.subnet(
|
|
network=n, cidr='fd00::0/64', gateway_ip='fd00::1',
|
|
ip_version=6, enable_dhcp=False) as s6, self.subnet(
|
|
network=n, cidr='2.0.0.0/24',
|
|
gateway_ip='2.0.0.1') as s4:
|
|
|
|
subnets = []
|
|
if s6_first:
|
|
self._router_interface_action('add',
|
|
r['router']['id'],
|
|
s6['subnet']['id'],
|
|
None)
|
|
subnets.append(s6['subnet']['cidr'])
|
|
|
|
self._router_interface_action('add',
|
|
r['router']['id'],
|
|
s4['subnet']['id'],
|
|
None)
|
|
subnets.append(s4['subnet']['cidr'])
|
|
|
|
if not s6_first:
|
|
self._router_interface_action('add',
|
|
r['router']['id'],
|
|
s6['subnet']['id'],
|
|
None)
|
|
subnets.append(s6['subnet']['cidr'])
|
|
|
|
# We expect two subnet objects on segment
|
|
seg_update.assert_called_with(
|
|
n['network']['id'],
|
|
subnets=[mock.ANY, mock.ANY],
|
|
tier1_id=r['router']['id'])
|
|
|
|
def test_router_add_v4_v6_subnets(self):
|
|
self._test_router_add_dual_stack_subnets()
|
|
|
|
def test_router_add_v6_v4_subnets(self):
|
|
self._test_router_add_dual_stack_subnets(s6_first=True)
|
|
|
|
def test_router_remove_dual_stack_subnets(self):
|
|
"""Delete dual stack subnets from router interface"""
|
|
|
|
with self.router() as r, self.network() as n:
|
|
with self.subnet(network=n, cidr='fd00::0/64',
|
|
ip_version=6, enable_dhcp=False) as s6, \
|
|
self.subnet(network=n, cidr='2.0.0.0/24') as s4:
|
|
|
|
body6 = self._router_interface_action('add', r['router']['id'],
|
|
s6['subnet']['id'],
|
|
None)
|
|
body4 = self._router_interface_action('add', r['router']['id'],
|
|
s4['subnet']['id'], None)
|
|
port = self._show('ports', body6['port_id'])
|
|
self.assertEqual(1, len(port['port']['fixed_ips']))
|
|
port = self._show('ports', body4['port_id'])
|
|
self.assertEqual(1, len(port['port']['fixed_ips']))
|
|
self._router_interface_action('remove', r['router']['id'],
|
|
s6['subnet']['id'], None)
|
|
self._router_interface_action('remove', r['router']['id'],
|
|
s4['subnet']['id'], None)
|
|
|
|
def test_router_add_interface_ipv6_single_subnet(self):
|
|
with self.router() as r, self.network() as n:
|
|
with self.subnet(network=n, cidr='fd00::1/64',
|
|
gateway_ip='fd00::1', ip_version=6,
|
|
enable_dhcp=False) as s:
|
|
self._test_router_add_interface_subnet(r, s)
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_route_clear_routes_with_None(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_route_clear_routes_with_None()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_route_update_with_multi_routes(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_route_update_with_multi_routes()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_route_update_with_one_route(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_route_update_with_one_route()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_router_update_delete_routes(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_update_delete_routes()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_router_interface_in_use_by_route(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_interface_in_use_by_route()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_create_floatingip_with_assoc_to_ipv4_and_ipv6_port(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_with_assoc_to_ipv4_and_ipv6_port()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_router_update_gateway_with_external_ip_used_by_gw(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_update_gateway_with_external_ip_used_by_gw()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_router_update_gateway_with_invalid_external_ip(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_update_gateway_with_invalid_external_ip()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_router_update_gateway_with_invalid_external_subnet(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_update_gateway_with_invalid_external_subnet()
|
|
|
|
@common_v3.with_external_network
|
|
def test_router_update_gateway_with_different_external_subnet(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_update_gateway_with_different_external_subnet()
|
|
|
|
@common_v3.with_external_subnet_once
|
|
def test_router_update_gateway_with_existed_floatingip(self):
|
|
with self.subnet(cidr='20.0.0.0/24') as subnet:
|
|
self._set_net_external(subnet['subnet']['network_id'])
|
|
with self.floatingip_with_assoc() as fip:
|
|
self._add_external_gateway_to_router(
|
|
fip['floatingip']['router_id'],
|
|
subnet['subnet']['network_id'],
|
|
expected_code=exc.HTTPConflict.code)
|
|
|
|
@common_v3.with_external_network
|
|
def test_router_update_gateway_add_multiple_prefixes_ipv6(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_update_gateway_add_multiple_prefixes_ipv6()
|
|
|
|
@common_v3.with_external_network
|
|
def test_router_concurrent_delete_upon_subnet_create(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_concurrent_delete_upon_subnet_create()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_router_add_gateway_dup_subnet2_returns_400(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_add_gateway_dup_subnet2_returns_400()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_router_update_gateway(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_update_gateway()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_router_create_with_gwinfo(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_create_with_gwinfo()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_router_clear_gateway_callback_failure_returns_409(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_clear_gateway_callback_failure_returns_409()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_router_create_with_gwinfo_ext_ip(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_create_with_gwinfo_ext_ip()
|
|
|
|
@common_v3.with_external_network
|
|
def test_router_create_with_gwinfo_ext_ip_subnet(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_create_with_gwinfo_ext_ip_subnet()
|
|
|
|
@common_v3.with_external_subnet_second_time
|
|
def test_router_delete_with_floatingip_existed_returns_409(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_delete_with_floatingip_existed_returns_409()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_router_add_and_remove_gateway_tenant_ctx(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_add_and_remove_gateway_tenant_ctx()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_router_add_and_remove_gateway(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_add_and_remove_gateway()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_floatingip_list_with_sort(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_floatingip_list_with_sort()
|
|
|
|
@common_v3.with_external_subnet_once
|
|
def test_floatingip_with_assoc_fails(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_floatingip_with_assoc_fails()
|
|
|
|
@common_v3.with_external_subnet_second_time
|
|
def test_floatingip_update_same_fixed_ip_same_port(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_floatingip_update_same_fixed_ip_same_port()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_floatingip_list_with_pagination_reverse(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_floatingip_list_with_pagination_reverse()
|
|
|
|
@common_v3.with_external_subnet_once
|
|
def test_floatingip_association_on_unowned_router(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_floatingip_association_on_unowned_router()
|
|
|
|
@common_v3.with_external_network
|
|
def test_delete_ext_net_with_disassociated_floating_ips(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_delete_ext_net_with_disassociated_floating_ips()
|
|
|
|
@common_v3.with_external_network
|
|
def test_create_floatingip_with_subnet_and_invalid_fip_address(self):
|
|
super(
|
|
NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_with_subnet_and_invalid_fip_address()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_create_floatingip_with_duplicated_specific_ip(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_with_duplicated_specific_ip()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_create_floatingip_with_subnet_id_non_admin(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_with_subnet_id_non_admin()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_floatingip_list_with_pagination(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_floatingip_list_with_pagination()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_create_floatingips_native_quotas(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingips_native_quotas()
|
|
|
|
@common_v3.with_external_network
|
|
def test_create_floatingip_with_multisubnet_id(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_with_multisubnet_id()
|
|
|
|
@common_v3.with_external_network
|
|
def test_create_floatingip_with_subnet_id_and_fip_address(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_with_subnet_id_and_fip_address()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_create_floatingip_with_specific_ip(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_with_specific_ip()
|
|
|
|
@common_v3.with_external_network
|
|
def test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4()
|
|
|
|
@common_v3.with_external_subnet_once
|
|
def test_create_floatingip_non_admin_context_agent_notification(self):
|
|
super(
|
|
NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_non_admin_context_agent_notification()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_create_floatingip_no_ext_gateway_return_404(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_no_ext_gateway_return_404()
|
|
|
|
@common_v3.with_external_subnet
|
|
def test_create_floatingip_with_specific_ip_out_of_allocation(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_with_specific_ip_out_of_allocation()
|
|
|
|
@common_v3.with_external_subnet_third_time
|
|
def test_floatingip_update_different_router(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_floatingip_update_different_router()
|
|
|
|
def test_floatingip_update(self):
|
|
super(NsxPTestL3NatTestCase, self).test_floatingip_update(
|
|
expected_status=constants.FLOATINGIP_STATUS_DOWN)
|
|
|
|
@common_v3.with_external_subnet_second_time
|
|
def test_floatingip_with_invalid_create_port(self):
|
|
self._test_floatingip_with_invalid_create_port(self._plugin_name)
|
|
|
|
def test_router_add_gateway_notifications(self):
|
|
with self.router() as r,\
|
|
self._create_l3_ext_network() as ext_net,\
|
|
self.subnet(network=ext_net, enable_dhcp=False):
|
|
with mock.patch.object(registry, 'publish') as publish:
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'], ext_net['network']['id'])
|
|
expected = [mock.call(
|
|
resources.ROUTER_GATEWAY,
|
|
events.AFTER_CREATE, mock.ANY,
|
|
payload=mock.ANY)]
|
|
publish.assert_has_calls(expected)
|
|
|
|
def test_router_add_gateway_no_subnet_forbidden(self):
|
|
with self.router() as r:
|
|
with self._create_l3_ext_network() as n:
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'], n['network']['id'],
|
|
expected_code=exc.HTTPBadRequest.code)
|
|
|
|
def test_router_update_on_external_port(self):
|
|
with self.router() as r:
|
|
with self._create_l3_ext_network() as ext_net,\
|
|
self.subnet(network=ext_net, cidr='10.0.1.0/24',
|
|
enable_dhcp=False) as s:
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'],
|
|
s['subnet']['network_id'])
|
|
body = self._show('routers', r['router']['id'])
|
|
net_id = body['router']['external_gateway_info']['network_id']
|
|
self.assertEqual(net_id, s['subnet']['network_id'])
|
|
port_res = self._list_ports(
|
|
'json',
|
|
200,
|
|
s['subnet']['network_id'],
|
|
tenant_id=r['router']['tenant_id'],
|
|
device_owner=constants.DEVICE_OWNER_ROUTER_GW)
|
|
port_list = self.deserialize('json', port_res)
|
|
self.assertEqual(len(port_list['ports']), 1)
|
|
|
|
routes = [{'destination': '135.207.0.0/16',
|
|
'nexthop': '10.0.1.3'}]
|
|
|
|
self.assertRaises(n_exc.InvalidInput,
|
|
self.plugin_instance.update_router,
|
|
context.get_admin_context(),
|
|
r['router']['id'],
|
|
{'router': {'routes':
|
|
routes}})
|
|
|
|
updates = {'admin_state_up': False}
|
|
self.assertRaises(n_exc.InvalidInput,
|
|
self.plugin_instance.update_router,
|
|
context.get_admin_context(),
|
|
r['router']['id'],
|
|
{'router': updates})
|
|
|
|
self._remove_external_gateway_from_router(
|
|
r['router']['id'],
|
|
s['subnet']['network_id'])
|
|
body = self._show('routers', r['router']['id'])
|
|
gw_info = body['router']['external_gateway_info']
|
|
self.assertIsNone(gw_info)
|
|
|
|
@mock.patch.object(nsx_plugin.NsxPolicyPlugin,
|
|
'validate_availability_zones')
|
|
def test_create_router_with_availability_zone(self, mock_validate_az):
|
|
name = 'rtr-with-zone'
|
|
zone = ['zone1']
|
|
mock_validate_az.return_value = None
|
|
with self.router(name=name, availability_zone_hints=zone) as rtr:
|
|
az_hints = rtr['router']['availability_zone_hints']
|
|
self.assertListEqual(zone, az_hints)
|
|
|
|
def test_update_router_distinct_edge_cluster(self):
|
|
# define an edge cluster in the config
|
|
edge_cluster = uuidutils.generate_uuid()
|
|
cfg.CONF.set_override('edge_cluster', edge_cluster, 'nsx_p')
|
|
self._initialize_azs()
|
|
path_prefix = ("/infra/sites/default/enforcement-points/default/"
|
|
"edge-clusters/")
|
|
# create a router and external network
|
|
with self.router() as r, \
|
|
self._create_l3_ext_network() as ext_net, \
|
|
self.subnet(network=ext_net, cidr='10.0.1.0/24',
|
|
enable_dhcp=False) as s, \
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyTier1Api.get_edge_cluster_path",
|
|
return_value=False), \
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyTier1Api.set_edge_cluster_path"
|
|
) as add_srv_router,\
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyTier1Api.get_realized_id"):
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'],
|
|
s['subnet']['network_id'])
|
|
add_srv_router.assert_called_once_with(
|
|
mock.ANY, '%s%s' % (path_prefix, edge_cluster))
|
|
|
|
def test_router_add_interface_cidr_overlapped_with_gateway(self):
|
|
with self.router() as r,\
|
|
self._create_l3_ext_network() as ext_net,\
|
|
self.subnet(cidr='10.0.1.0/24') as s1,\
|
|
self.subnet(network=ext_net, cidr='10.0.0.0/16',
|
|
enable_dhcp=False) as s2:
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'],
|
|
s2['subnet']['network_id'])
|
|
res = self._router_interface_action(
|
|
'add', r['router']['id'],
|
|
s1['subnet']['id'], None,
|
|
expected_code=exc.HTTPBadRequest.code)
|
|
self.assertIn('NeutronError', res)
|
|
|
|
def test_router_add_gateway_overlapped_with_interface_cidr(self):
|
|
with self.router() as r,\
|
|
self._create_l3_ext_network() as ext_net,\
|
|
self.subnet(cidr='10.0.1.0/24') as s1,\
|
|
self.subnet(network=ext_net, cidr='10.0.0.0/16',
|
|
enable_dhcp=False) as s2:
|
|
self._router_interface_action(
|
|
'add', r['router']['id'],
|
|
s1['subnet']['id'], None)
|
|
res = self._add_external_gateway_to_router(
|
|
r['router']['id'],
|
|
s2['subnet']['network_id'],
|
|
expected_code=exc.HTTPBadRequest.code)
|
|
self.assertIn('NeutronError', res)
|
|
|
|
def test_router_add_interface_by_port_cidr_overlapped_with_gateway(self):
|
|
with self.router() as r,\
|
|
self._create_l3_ext_network() as ext_net,\
|
|
self.subnet(cidr='10.0.1.0/24') as s1,\
|
|
self.subnet(network=ext_net, cidr='10.0.0.0/16',
|
|
enable_dhcp=False) as s2,\
|
|
self.port(subnet=s1) as p:
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'],
|
|
s2['subnet']['network_id'])
|
|
|
|
res = self._router_interface_action(
|
|
'add', r['router']['id'],
|
|
None,
|
|
p['port']['id'],
|
|
expected_code=exc.HTTPBadRequest.code)
|
|
self.assertIn('NeutronError', res)
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_create_floatingip_with_assoc_to_ipv6_subnet(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_with_assoc_to_ipv6_subnet()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_router_add_interface_ipv6_subnet_without_gateway_ip(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_router_add_interface_ipv6_subnet_without_gateway_ip()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_router_add_interface_multiple_ipv6_subnets_different_net(self):
|
|
super(NsxPTestL3NatTestCase, self).\
|
|
test_router_add_interface_multiple_ipv6_subnets_different_net()
|
|
|
|
@common_v3.with_disable_dhcp
|
|
def test_create_floatingip_ipv6_only_network_returns_400(self):
|
|
super(NsxPTestL3NatTestCase,
|
|
self).test_create_floatingip_ipv6_only_network_returns_400()
|
|
|
|
def test_router_add_iface_ipv6_ext_ra_subnet_returns_400(self):
|
|
self.skipTest('DHCPv6 not supported')
|
|
|
|
def test_create_floatingip_invalid_fixed_ipv6_address_returns_400(self):
|
|
self.skipTest('Failed because of illegal port id')
|
|
|
|
def test_create_floatingip_with_router_interface_device_owner_fail(self):
|
|
# This tests that an error is raised when trying to assign a router
|
|
# interface port with floatingip.
|
|
|
|
with self.subnet(cidr='30.0.0.0/24', gateway_ip=None) as private_sub:
|
|
with self.port(
|
|
subnet=private_sub,
|
|
device_owner=constants.DEVICE_OWNER_ROUTER_INTF) as p:
|
|
port_id = p['port']['id']
|
|
with self.router() as r:
|
|
self._router_interface_action('add', r['router']['id'],
|
|
None, port_id)
|
|
with self.external_network() as public_net, self.subnet(
|
|
network=public_net, cidr='12.0.0.0/24',
|
|
enable_dhcp=False) as public_sub:
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'],
|
|
public_sub['subnet']['network_id'])
|
|
self._make_floatingip(
|
|
self.fmt, public_sub['subnet']['network_id'],
|
|
port_id=port_id,
|
|
http_status=exc.HTTPBadRequest.code)
|
|
|
|
def test_assign_floatingip_to_router_interface_device_owner_fail(self):
|
|
# This tests that an error is raised when trying to assign a router
|
|
# interface port with floatingip.
|
|
|
|
with self.subnet(cidr='30.0.0.0/24', gateway_ip=None) as private_sub:
|
|
with self.port(
|
|
subnet=private_sub,
|
|
device_owner=constants.DEVICE_OWNER_ROUTER_INTF) as p:
|
|
port_id = p['port']['id']
|
|
with self.router() as r:
|
|
self._router_interface_action('add', r['router']['id'],
|
|
None, port_id)
|
|
with self.external_network() as public_net, self.subnet(
|
|
network=public_net, cidr='12.0.0.0/24',
|
|
enable_dhcp=False) as public_sub:
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'],
|
|
public_sub['subnet']['network_id'])
|
|
fip = self._make_floatingip(self.fmt, public_sub[
|
|
'subnet']['network_id'])
|
|
self._update('floatingips', fip['floatingip'][
|
|
'id'], {'floatingip': {'port_id': port_id}},
|
|
expected_code=exc.HTTPBadRequest.code)
|
|
|
|
def test_router_delete_with_lb_service(self):
|
|
self.lb_mock1.stop()
|
|
self.lb_mock2.stop()
|
|
# Create the LB object - here the delete callback is registered
|
|
loadbalancer = loadbalancer_mgr.EdgeLoadBalancerManagerFromDict()
|
|
oct_listener = octavia_listener.NSXOctaviaListenerEndpoint(
|
|
loadbalancer=loadbalancer)
|
|
with self.router() as router:
|
|
with mock.patch.object(
|
|
self.plugin.nsxpolicy, 'search_by_tags',
|
|
return_value={'results': [{'id': 'dummy'}]}):
|
|
self.assertRaises(nc_exc.CallbackFailure,
|
|
self.plugin_instance.delete_router,
|
|
context.get_admin_context(),
|
|
router['router']['id'])
|
|
# Unregister callback
|
|
oct_listener._unsubscribe_router_delete_callback()
|
|
self.lb_mock1.start()
|
|
self.lb_mock2.start()
|
|
|
|
def test_router_delete_with_no_lb_service(self):
|
|
self.lb_mock1.stop()
|
|
self.lb_mock2.stop()
|
|
# Create the LB object - here the delete callback is registered
|
|
loadbalancer = loadbalancer_mgr.EdgeLoadBalancerManagerFromDict()
|
|
oct_listener = octavia_listener.NSXOctaviaListenerEndpoint(
|
|
loadbalancer=loadbalancer)
|
|
with self.router() as router:
|
|
with mock.patch.object(
|
|
self.plugin.nsxpolicy, 'search_by_tags',
|
|
return_value={'results': []}):
|
|
self.plugin_instance.delete_router(
|
|
context.get_admin_context(),
|
|
router['router']['id'])
|
|
# Unregister callback
|
|
oct_listener._unsubscribe_router_delete_callback()
|
|
self.lb_mock1.start()
|
|
self.lb_mock2.start()
|
|
|
|
def test_router_gw_info_rollback(self):
|
|
"""Fail the GW addition and verify rollback was performed"""
|
|
with self.router() as r,\
|
|
self.external_network() as public_net,\
|
|
self.subnet(network=public_net, cidr='12.0.0.0/24',
|
|
enable_dhcp=False) as s1,\
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyTier1Api.update_route_advertisement",
|
|
side_effect=nsxlib_exc.NsxLibException):
|
|
# Make sure creation fails
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'],
|
|
s1['subnet']['network_id'],
|
|
expected_code=exc.HTTPInternalServerError.code)
|
|
# Make sure there is no GW configured
|
|
body = self._show('routers', r['router']['id'])
|
|
self.assertIsNone(body['router']['external_gateway_info'])
|
|
|
|
def test_router_create_with_gw_info_failed(self):
|
|
"""Fail the GW addition during router creation
|
|
and verify rollback was performed
|
|
"""
|
|
with self.router() as r,\
|
|
self.external_network() as public_net,\
|
|
self.subnet(network=public_net, cidr='12.0.0.0/24',
|
|
enable_dhcp=False) as s1,\
|
|
mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyTier1Api.update_route_advertisement",
|
|
side_effect=nsxlib_exc.NsxLibException):
|
|
# Make sure creation fails
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'],
|
|
s1['subnet']['network_id'],
|
|
expected_code=exc.HTTPInternalServerError.code)
|
|
# Make sure there is no GW configured
|
|
body = self._show('routers', r['router']['id'])
|
|
self.assertIsNone(body['router']['external_gateway_info'])
|
|
|
|
def test_create_router_gateway_fails(self):
|
|
with self.external_network() as public_net,\
|
|
self.subnet(network=public_net, cidr='12.0.0.0/24',
|
|
enable_dhcp=False),\
|
|
mock.patch.object(self.plugin.nsxpolicy.tier1,
|
|
"get_edge_cluster_path",
|
|
return_value=False),\
|
|
mock.patch.object(self.plugin.nsxpolicy.tier1,
|
|
"set_edge_cluster_path",
|
|
side_effect=nsxlib_exc.NsxLibException):
|
|
data = {'router': {
|
|
'name': 'router1', 'admin_state_up': True,
|
|
'tenant_id': self._tenant_id,
|
|
'external_gateway_info': {
|
|
'network_id': public_net['network']['id']}}}
|
|
|
|
self.assertRaises(nsxlib_exc.NsxLibException,
|
|
self.plugin.create_router, self.ctx, data)
|
|
# Verify router doesn't persist on failure
|
|
routers = self.plugin.get_routers(self.ctx)
|
|
self.assertEqual(0, len(routers))
|
|
|
|
def test_delete_router_gateway_fails(self):
|
|
"""Verify that router deletion continues even if gw update fails"""
|
|
with self.router() as r,\
|
|
self.external_network() as public_net,\
|
|
self.subnet(network=public_net, cidr='12.0.0.0/24',
|
|
enable_dhcp=False) as s1:
|
|
self._add_external_gateway_to_router(
|
|
r['router']['id'],
|
|
s1['subnet']['network_id'])
|
|
with mock.patch("vmware_nsxlib.v3.policy.core_resources."
|
|
"NsxPolicyTier1Api.update_route_advertisement",
|
|
side_effect=nsxlib_exc.NsxLibException):
|
|
self._delete('routers', r['router']['id'])
|
|
routers = self.plugin.get_routers(self.ctx)
|
|
self.assertEqual(0, len(routers))
|