nsx v3 port security support
this patch adds the port security and address pairs extensions to the nsx v3 plugin. the patch also includes the core neutron unit tests for those extensions. NB: I did a good bit of manual testing with this code and didn't find any problems. however I'm not fully vested on all the other impl details so would be good to get some burn-in on this code by others. Change-Id: Iaca19428515a25ac6cf433c65cc81968e3175768
This commit is contained in:
parent
0c4652bede
commit
35bf6d16b8
@ -30,6 +30,7 @@ from neutron.common import topics
|
||||
from neutron.common import utils as neutron_utils
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db import allowedaddresspairs_db as addr_pair_db
|
||||
from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import external_net_db
|
||||
from neutron.db import extradhcpopt_db
|
||||
@ -38,11 +39,14 @@ from neutron.db import l3_db
|
||||
from neutron.db import l3_gwmode_db
|
||||
from neutron.db import models_v2
|
||||
from neutron.db import portbindings_db
|
||||
from neutron.db import portsecurity_db
|
||||
from neutron.db import securitygroups_db
|
||||
from neutron.extensions import allowedaddresspairs as addr_pair
|
||||
from neutron.extensions import external_net as ext_net_extn
|
||||
from neutron.extensions import extra_dhcp_opt as ext_edo
|
||||
from neutron.extensions import l3
|
||||
from neutron.extensions import portbindings as pbin
|
||||
from neutron.extensions import portsecurity as psec
|
||||
from neutron.extensions import providernet as pnet
|
||||
from neutron.extensions import securitygroup as ext_sg
|
||||
from neutron.i18n import _LE, _LI, _LW
|
||||
@ -67,14 +71,17 @@ from vmware_nsx.nsxlib.v3 import security
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
NSX_V3_PSEC_PROFILE_NAME = 'neutron_port_spoof_guard_profile'
|
||||
|
||||
|
||||
class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
|
||||
db_base_plugin_v2.NeutronDbPluginV2,
|
||||
securitygroups_db.SecurityGroupDbMixin,
|
||||
external_net_db.External_net_db_mixin,
|
||||
extraroute_db.ExtraRoute_db_mixin,
|
||||
l3_gwmode_db.L3_NAT_db_mixin,
|
||||
portbindings_db.PortBindingMixin,
|
||||
portsecurity_db.PortSecurityDbMixin,
|
||||
agentschedulers_db.DhcpAgentSchedulerDbMixin,
|
||||
extradhcpopt_db.ExtraDhcpOptMixin):
|
||||
|
||||
@ -82,11 +89,13 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
__native_pagination_support = True
|
||||
__native_sorting_support = True
|
||||
|
||||
supported_extension_aliases = ["quotas",
|
||||
supported_extension_aliases = ["allowed-address-pairs",
|
||||
"quotas",
|
||||
"binding",
|
||||
"extra_dhcp_opt",
|
||||
"ext-gw-mode",
|
||||
"security-group",
|
||||
"port-security",
|
||||
"provider",
|
||||
"external-net",
|
||||
"extraroute",
|
||||
@ -109,6 +118,45 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.nsgroup_container, self.default_section = (
|
||||
security.init_nsgroup_container_and_default_section_rules())
|
||||
|
||||
LOG.debug("Initializing NSX v3 port spoofguard switching profile")
|
||||
self._switching_profiles = nsx_resources.SwitchingProfile(
|
||||
self._nsx_client)
|
||||
self._psec_profile = None
|
||||
self._psec_profile = self._init_port_security_profile()
|
||||
if not self._psec_profile:
|
||||
msg = (_("Unable to initialize NSX v3 port spoofguard "
|
||||
"switching profile: %s") % NSX_V3_PSEC_PROFILE_NAME)
|
||||
raise nsx_exc.NsxPluginException(msg)
|
||||
|
||||
def _get_port_security_profile_id(self):
|
||||
return nsx_resources.SwitchingProfile.build_switch_profile_ids(
|
||||
self._switching_profiles, self._get_port_security_profile())[0]
|
||||
|
||||
def _get_port_security_profile(self):
|
||||
if self._psec_profile:
|
||||
return self._psec_profile
|
||||
profile = self._switching_profiles.find_by_display_name(
|
||||
NSX_V3_PSEC_PROFILE_NAME)
|
||||
return profile[0] if profile else None
|
||||
|
||||
@utils.retry_upon_exception_nsxv3(Exception)
|
||||
def _init_port_security_profile(self):
|
||||
# NOTE(boden): potential race cond with distributed plugins
|
||||
# whereupon a different plugin could create the profile
|
||||
# after we don't find an existing one and create another
|
||||
profile = self._get_port_security_profile()
|
||||
if profile:
|
||||
return profile
|
||||
|
||||
self._switching_profiles.create_spoofguard_profile(
|
||||
NSX_V3_PSEC_PROFILE_NAME, 'Neutron Port Security Profile',
|
||||
whitelist_ports=True, whitelist_switches=False,
|
||||
tags=utils.build_v3_tags_payload({
|
||||
'id': NSX_V3_PSEC_PROFILE_NAME,
|
||||
'tenant_id': 'neutron-nsx-plugin'}))
|
||||
|
||||
return self._get_port_security_profile()
|
||||
|
||||
def _setup_rpc(self):
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = n_rpc.create_connection(new=True)
|
||||
@ -259,12 +307,16 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self._create_network_at_the_backend(context, net_data))
|
||||
tenant_id = self._get_tenant_id_for_create(
|
||||
context, net_data)
|
||||
|
||||
self._ensure_default_security_group(context, tenant_id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
# Create network in Neutron
|
||||
try:
|
||||
created_net = super(NsxV3Plugin, self).create_network(context,
|
||||
network)
|
||||
|
||||
self._process_network_port_security_create(
|
||||
context, net_data, created_net)
|
||||
self._process_l3_create(context, created_net, net_data)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
@ -308,6 +360,10 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
pnet._raise_if_updates_provider_attributes(net_data)
|
||||
updated_net = super(NsxV3Plugin, self).update_network(context, id,
|
||||
network)
|
||||
|
||||
if psec.PORTSECURITY in network['network']:
|
||||
self._process_network_port_security_update(
|
||||
context, network['network'], updated_net)
|
||||
self._process_l3_update(context, updated_net, network['network'])
|
||||
self._extend_network_dict_provider(context, updated_net)
|
||||
|
||||
@ -343,10 +399,18 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
# NOTE(arosen): nsx-v3 doesn't seem to handle ipv6 addresses
|
||||
# currently so for now we remove them here and do not pass
|
||||
# them to the backend which would raise an error.
|
||||
if(netaddr.IPNetwork(fixed_ip['ip_address']).version == 6):
|
||||
if netaddr.IPNetwork(fixed_ip['ip_address']).version == 6:
|
||||
continue
|
||||
address_bindings.append(nsx_resources.PacketAddressClassifier(
|
||||
fixed_ip['ip_address'], port['mac_address'], None))
|
||||
|
||||
for pair in port.get(addr_pair.ADDRESS_PAIRS):
|
||||
address_bindings.append(nsx_resources.PacketAddressClassifier(
|
||||
pair['ip_address'], pair['mac_address'], None))
|
||||
|
||||
# TODO(boden): this default pair is not needed with nsxv3 for dhcp
|
||||
address_bindings.append(nsx_resources.PacketAddressClassifier(
|
||||
'0.0.0.0', port['mac_address'], None))
|
||||
return address_bindings
|
||||
|
||||
def get_network(self, context, id, fields=None):
|
||||
@ -412,7 +476,8 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
return parent_name, tag
|
||||
|
||||
def _create_port_at_the_backend(self, context, neutron_db,
|
||||
port_data, l2gw_port_check):
|
||||
port_data, l2gw_port_check,
|
||||
psec_is_on):
|
||||
tags = utils.build_v3_tags_payload(port_data)
|
||||
parent_name, tag = self._get_data_from_binding_profile(
|
||||
context, port_data)
|
||||
@ -427,6 +492,11 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
# for ports plugged into a Bridge Endpoint.
|
||||
vif_uuid = port_data.get('device_id')
|
||||
attachment_type = port_data.get('device_owner')
|
||||
|
||||
profiles = None
|
||||
if psec_is_on:
|
||||
profiles = [self._get_port_security_profile_id()]
|
||||
|
||||
result = self._port_client.create(
|
||||
port_data['network_id'], vif_uuid,
|
||||
tags=tags,
|
||||
@ -434,7 +504,8 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
admin_state=port_data['admin_state_up'],
|
||||
address_bindings=address_bindings,
|
||||
attachment_type=attachment_type,
|
||||
parent_name=parent_name, parent_tag=tag)
|
||||
parent_name=parent_name, parent_tag=tag,
|
||||
switch_profile_ids=profiles)
|
||||
|
||||
# TODO(salv-orlando): The logical switch identifier in the
|
||||
# mapping object is not necessary anymore.
|
||||
@ -443,43 +514,73 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
neutron_db['network_id'], result['id'])
|
||||
return result
|
||||
|
||||
def create_port(self, context, port, l2gw_port_check=False):
|
||||
dhcp_opts = port['port'].get(ext_edo.EXTRADHCPOPTS, [])
|
||||
port_id = uuidutils.generate_uuid()
|
||||
port['port']['id'] = port_id
|
||||
def _create_port_preprocess_security(
|
||||
self, context, port, port_data, neutron_db):
|
||||
(port_security, has_ip) = self._determine_port_security_and_has_ip(
|
||||
context, port_data)
|
||||
port_data[psec.PORTSECURITY] = port_security
|
||||
self._process_port_port_security_create(
|
||||
context, port_data, neutron_db)
|
||||
# allowed address pair checks
|
||||
if attributes.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)):
|
||||
if not port_security:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
else:
|
||||
self._process_create_allowed_address_pairs(
|
||||
context, neutron_db,
|
||||
port_data[addr_pair.ADDRESS_PAIRS])
|
||||
else:
|
||||
# remove ATTR_NOT_SPECIFIED
|
||||
port_data[addr_pair.ADDRESS_PAIRS] = []
|
||||
|
||||
if port_security and has_ip:
|
||||
self._ensure_default_security_group_on_port(context, port)
|
||||
elif self._check_update_has_security_groups(
|
||||
{'port': port_data}):
|
||||
raise psec.PortSecurityAndIPRequiredForSecurityGroups()
|
||||
port_data[ext_sg.SECURITYGROUPS] = (
|
||||
self._get_security_groups_on_port(context, port))
|
||||
return port_security, has_ip
|
||||
|
||||
def create_port(self, context, port, l2gw_port_check=False):
|
||||
port_data = port['port']
|
||||
dhcp_opts = port_data.get(ext_edo.EXTRADHCPOPTS, [])
|
||||
|
||||
self._ensure_default_security_group_on_port(context, port)
|
||||
# TODO(salv-orlando): Undo logical switch creation on failure
|
||||
with context.session.begin(subtransactions=True):
|
||||
neutron_db = super(NsxV3Plugin, self).create_port(context, port)
|
||||
self._process_portbindings_create_and_update(context,
|
||||
port['port'],
|
||||
neutron_db)
|
||||
|
||||
port["port"].update(neutron_db)
|
||||
|
||||
if not self._network_is_external(
|
||||
context, port['port']['network_id']):
|
||||
lport = self._create_port_at_the_backend(
|
||||
context, neutron_db, port['port'], l2gw_port_check)
|
||||
self._process_portbindings_create_and_update(context,
|
||||
port['port'],
|
||||
neutron_db)
|
||||
(is_psec_on, has_ip) = self._create_port_preprocess_security(
|
||||
context, port, port_data, neutron_db)
|
||||
self._process_portbindings_create_and_update(
|
||||
context, port['port'], port_data)
|
||||
self._process_port_create_extra_dhcp_opts(
|
||||
context, port_data, dhcp_opts)
|
||||
|
||||
neutron_db[pbin.VNIC_TYPE] = pbin.VNIC_NORMAL
|
||||
if (pbin.PROFILE in port['port'] and
|
||||
attributes.is_attr_set(port['port'][pbin.PROFILE])):
|
||||
neutron_db[pbin.PROFILE] = port['port'][pbin.PROFILE]
|
||||
self._process_port_create_extra_dhcp_opts(context, neutron_db,
|
||||
dhcp_opts)
|
||||
context.session.flush()
|
||||
|
||||
if not self._network_is_external(context, port_data['network_id']):
|
||||
lport = self._create_port_at_the_backend(
|
||||
context, neutron_db, port_data,
|
||||
l2gw_port_check, is_psec_on)
|
||||
|
||||
# For some reason the port bindings DB mixin does not handle
|
||||
# the VNIC_TYPE attribute, which is required by nova for
|
||||
# setting up VIFs.
|
||||
port_data[pbin.VNIC_TYPE] = pbin.VNIC_NORMAL
|
||||
|
||||
sgids = self._get_security_groups_on_port(context, port)
|
||||
if sgids is not None:
|
||||
self._process_port_create_security_group(
|
||||
context, neutron_db, sgids)
|
||||
context, port_data, sgids)
|
||||
#FIXME(abhiraut): Security group should not be processed for
|
||||
# a port belonging to an external network.
|
||||
# Below call will fail since there is no lport
|
||||
# in the backend.
|
||||
security.update_lport_with_security_groups(
|
||||
context, lport['id'], [], sgids)
|
||||
return neutron_db
|
||||
return port_data
|
||||
|
||||
def _pre_delete_port_check(self, context, port_id, l2gw_port_check):
|
||||
"""Perform checks prior to deleting a port."""
|
||||
@ -521,21 +622,100 @@ class NsxV3Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
|
||||
return ret_val
|
||||
|
||||
def _update_port_preprocess_security(
|
||||
self, context, port, id, updated_port):
|
||||
delete_addr_pairs = self._check_update_deletes_allowed_address_pairs(
|
||||
port)
|
||||
has_addr_pairs = self._check_update_has_allowed_address_pairs(port)
|
||||
has_security_groups = self._check_update_has_security_groups(port)
|
||||
delete_security_groups = self._check_update_deletes_security_groups(
|
||||
port)
|
||||
|
||||
# populate port_security setting
|
||||
if psec.PORTSECURITY not in port['port']:
|
||||
updated_port[psec.PORTSECURITY] = \
|
||||
self._get_port_security_binding(context, id)
|
||||
has_ip = self._ip_on_port(updated_port)
|
||||
# validate port security and allowed address pairs
|
||||
if not updated_port[psec.PORTSECURITY]:
|
||||
# has address pairs in request
|
||||
if has_addr_pairs:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
elif not delete_addr_pairs:
|
||||
# check if address pairs are in db
|
||||
updated_port[addr_pair.ADDRESS_PAIRS] = (
|
||||
self.get_allowed_address_pairs(context, id))
|
||||
if updated_port[addr_pair.ADDRESS_PAIRS]:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
|
||||
if delete_addr_pairs or has_addr_pairs:
|
||||
# delete address pairs and read them in
|
||||
self._delete_allowed_address_pairs(context, id)
|
||||
self._process_create_allowed_address_pairs(
|
||||
context, updated_port,
|
||||
updated_port[addr_pair.ADDRESS_PAIRS])
|
||||
|
||||
# checks if security groups were updated adding/modifying
|
||||
# security groups, port security is set and port has ip
|
||||
if not (has_ip and updated_port[psec.PORTSECURITY]):
|
||||
if has_security_groups:
|
||||
raise psec.PortSecurityAndIPRequiredForSecurityGroups()
|
||||
# Update did not have security groups passed in. Check
|
||||
# that port does not have any security groups already on it.
|
||||
filters = {'port_id': [id]}
|
||||
security_groups = (
|
||||
super(NsxV3Plugin, self)._get_port_security_group_bindings(
|
||||
context, filters)
|
||||
)
|
||||
if security_groups and not delete_security_groups:
|
||||
raise psec.PortSecurityPortHasSecurityGroup()
|
||||
|
||||
if delete_security_groups or has_security_groups:
|
||||
# delete the port binding and read it with the new rules.
|
||||
self._delete_port_security_group_bindings(context, id)
|
||||
sgids = self._get_security_groups_on_port(context, port)
|
||||
self._process_port_create_security_group(context, updated_port,
|
||||
sgids)
|
||||
|
||||
if psec.PORTSECURITY in port['port']:
|
||||
self._process_port_port_security_update(
|
||||
context, port['port'], updated_port)
|
||||
|
||||
return updated_port
|
||||
|
||||
def update_port(self, context, id, port):
|
||||
original_port = super(NsxV3Plugin, self).get_port(context, id)
|
||||
_, nsx_lport_id = nsx_db.get_nsx_switch_and_port_id(
|
||||
context.session, id)
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
updated_port = super(NsxV3Plugin, self).update_port(context,
|
||||
id, port)
|
||||
|
||||
# copy values over - except fixed_ips as
|
||||
# they've already been processed
|
||||
port['port'].pop('fixed_ips', None)
|
||||
updated_port.update(port['port'])
|
||||
self._update_extra_dhcp_opts_on_port(
|
||||
context, id, port, updated_port)
|
||||
|
||||
updated_port = self._update_port_preprocess_security(
|
||||
context, port, id, updated_port)
|
||||
|
||||
self._update_extra_dhcp_opts_on_port(context, id, port,
|
||||
updated_port)
|
||||
sec_grp_updated = self.update_security_group_on_port(
|
||||
context, id, port, original_port, updated_port)
|
||||
(port_security, has_ip) = self._determine_port_security_and_has_ip(
|
||||
context, updated_port)
|
||||
try:
|
||||
self._port_client.update(
|
||||
nsx_lport_id, name=port['port'].get('name'),
|
||||
admin_state=port['port'].get('admin_state_up'))
|
||||
nsx_lport_id, name=updated_port.get('name'),
|
||||
admin_state=updated_port.get('admin_state_up'),
|
||||
address_bindings=self._build_address_bindings(updated_port),
|
||||
switch_profile_ids=[self._get_port_security_profile_id()]
|
||||
if port_security else None)
|
||||
|
||||
security.update_lport_with_security_groups(
|
||||
context, nsx_lport_id,
|
||||
original_port.get(ext_sg.SECURITYGROUPS, []),
|
||||
|
@ -17,10 +17,12 @@ from neutron.extensions import allowedaddresspairs as addr_pair
|
||||
from neutron.tests.unit.db import test_allowedaddresspairs_db as ext_pairs
|
||||
|
||||
from vmware_nsx.tests.unit.nsx_mh import test_plugin as test_nsx_plugin
|
||||
from vmware_nsx.tests.unit.nsx_v3 import test_constants as v3_constants
|
||||
from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_v3_plugin
|
||||
|
||||
|
||||
class TestAllowedAddressPairs(test_nsx_plugin.NsxPluginV2TestCase,
|
||||
ext_pairs.TestAllowedAddressPairs):
|
||||
class TestAllowedAddressPairsNSXv2(test_nsx_plugin.NsxPluginV2TestCase,
|
||||
ext_pairs.TestAllowedAddressPairs):
|
||||
|
||||
# TODO(arosen): move to ext_pairs.TestAllowedAddressPairs once all
|
||||
# plugins do this correctly.
|
||||
@ -33,3 +35,16 @@ class TestAllowedAddressPairs(test_nsx_plugin.NsxPluginV2TestCase,
|
||||
|
||||
def test_create_port_security_false_allowed_address_pairs(self):
|
||||
self.skipTest('TBD')
|
||||
|
||||
|
||||
class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin,
|
||||
ext_pairs.TestAllowedAddressPairs):
|
||||
|
||||
def setUp(self, plugin=v3_constants.PLUGIN_NAME,
|
||||
ext_mgr=None,
|
||||
service_plugins=None):
|
||||
super(TestAllowedAddressPairsNSXv3, self).setUp(
|
||||
plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins)
|
||||
|
||||
def test_create_port_security_false_allowed_address_pairs(self):
|
||||
self.skipTest('TBD')
|
||||
|
@ -19,10 +19,12 @@ from neutron.tests.unit.extensions import test_portsecurity as psec
|
||||
from vmware_nsx.common import sync
|
||||
from vmware_nsx.tests import unit as vmware
|
||||
from vmware_nsx.tests.unit.nsx_mh.apiclient import fake
|
||||
from vmware_nsx.tests.unit.nsx_v3 import test_constants as v3_constants
|
||||
from vmware_nsx.tests.unit.nsxlib.v3 import nsxlib_testcase
|
||||
from vmware_nsx.tests.unit import test_utils
|
||||
|
||||
|
||||
class PortSecurityTestCase(psec.PortSecurityDBTestCase):
|
||||
class PortSecurityTestCaseNSXv2(psec.PortSecurityDBTestCase):
|
||||
|
||||
def setUp(self):
|
||||
test_utils.override_nsx_ini_test()
|
||||
@ -36,11 +38,23 @@ class PortSecurityTestCase(psec.PortSecurityDBTestCase):
|
||||
patch_sync.start()
|
||||
|
||||
instance.return_value.request.side_effect = self.fc.fake_request
|
||||
super(PortSecurityTestCase, self).setUp(vmware.PLUGIN_NAME)
|
||||
super(PortSecurityTestCaseNSXv2, self).setUp(vmware.PLUGIN_NAME)
|
||||
self.addCleanup(self.fc.reset_all)
|
||||
self.addCleanup(self.mock_nsx.stop)
|
||||
self.addCleanup(patch_sync.stop)
|
||||
|
||||
|
||||
class TestPortSecurity(PortSecurityTestCase, psec.TestPortSecurity):
|
||||
class TestPortSecurityNSXv2(PortSecurityTestCaseNSXv2, psec.TestPortSecurity):
|
||||
pass
|
||||
|
||||
|
||||
class PortSecurityTestCaseNSXv3(nsxlib_testcase.NsxClientTestCase,
|
||||
psec.PortSecurityDBTestCase):
|
||||
def setUp(self, *args, **kwargs):
|
||||
super(PortSecurityTestCaseNSXv3, self).setUp(
|
||||
plugin=v3_constants.PLUGIN_NAME)
|
||||
|
||||
|
||||
class TestPortSecurityNSXv3(PortSecurityTestCaseNSXv3,
|
||||
psec.TestPortSecurity):
|
||||
pass
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
PLUGIN_NAME = 'vmware_nsx.plugins.nsx_v3.plugin.NsxV3Plugin'
|
||||
FAKE_NAME = "fake_name"
|
||||
FAKE_SWITCH_UUID = uuidutils.generate_uuid()
|
||||
|
||||
|
@ -23,6 +23,7 @@ from neutron.extensions import extraroute
|
||||
from neutron.extensions import l3
|
||||
from neutron.extensions import l3_ext_gw_mode
|
||||
from neutron.extensions import providernet as pnet
|
||||
from neutron.extensions import securitygroup as secgrp
|
||||
from neutron import manager
|
||||
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
|
||||
from neutron.tests.unit.extensions import test_extra_dhcp_opt as test_dhcpopts
|
||||
@ -58,17 +59,17 @@ class NsxV3PluginTestCaseMixin(test_plugin.NeutronDbPluginV2TestCase,
|
||||
self.mock_api = nsx_v3_mocks.MockRequestSessionApi()
|
||||
self.client = nsx_client.NSX3Client()
|
||||
|
||||
mocked = nsxlib_testcase.NsxClientTestCase.mocked_session_module(
|
||||
nsx_plugin.security.firewall, self.client,
|
||||
mock_session=self.mock_api)
|
||||
mocked.start()
|
||||
self._patchers.append(mocked)
|
||||
def mock_client_module(mod):
|
||||
mocked = nsxlib_testcase.NsxClientTestCase.mocked_session_module(
|
||||
mod, self.client,
|
||||
mock_session=self.mock_api)
|
||||
mocked.start()
|
||||
self._patchers.append(mocked)
|
||||
|
||||
mock_client_module(nsx_plugin.security.firewall)
|
||||
mock_client_module(nsx_plugin.routerlib.nsxlib)
|
||||
mock_client_module(nsx_plugin)
|
||||
|
||||
mocked = nsxlib_testcase.NsxClientTestCase.mocked_session_module(
|
||||
nsx_plugin.routerlib.nsxlib, self.client,
|
||||
mock_session=self.mock_api)
|
||||
mocked.start()
|
||||
self._patchers.append(mocked)
|
||||
super(NsxV3PluginTestCaseMixin, self).setUp(plugin=plugin,
|
||||
ext_mgr=ext_mgr)
|
||||
|
||||
@ -117,7 +118,24 @@ class TestNetworksV2(test_plugin.TestNetworksV2, NsxV3PluginTestCaseMixin):
|
||||
|
||||
|
||||
class TestPortsV2(test_plugin.TestPortsV2, NsxV3PluginTestCaseMixin):
|
||||
pass
|
||||
|
||||
def test_update_port_delete_ip(self):
|
||||
# This test case overrides the default because the nsx plugin
|
||||
# implements port_security/security groups and it is not allowed
|
||||
# to remove an ip address from a port unless the security group
|
||||
# is first removed.
|
||||
with self.subnet() as subnet:
|
||||
with self.port(subnet=subnet) as port:
|
||||
data = {'port': {'admin_state_up': False,
|
||||
'fixed_ips': [],
|
||||
secgrp.SECURITYGROUPS: []}}
|
||||
req = self.new_update_request('ports',
|
||||
data, port['port']['id'])
|
||||
res = self.deserialize('json', req.get_response(self.api))
|
||||
self.assertEqual(res['port']['admin_state_up'],
|
||||
data['port']['admin_state_up'])
|
||||
self.assertEqual(res['port']['fixed_ips'],
|
||||
data['port']['fixed_ips'])
|
||||
|
||||
|
||||
class TestSecurityGroups(ext_sg.TestSecurityGroups, NsxV3PluginTestCaseMixin):
|
||||
|
@ -19,6 +19,7 @@ import types
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
from vmware_nsx.nsxlib.v3 import client as nsx_client
|
||||
from vmware_nsx.tests.unit.nsx_v3 import mocks
|
||||
|
||||
@ -38,6 +39,9 @@ class NsxLibTestCase(unittest.TestCase):
|
||||
super(NsxLibTestCase, self).setUp()
|
||||
cfg.CONF.set_override('nsx_user', NSX_USER)
|
||||
cfg.CONF.set_override('nsx_password', NSX_PASSWORD)
|
||||
cfg.CONF.set_override('default_tz_uuid',
|
||||
uuidutils.generate_uuid())
|
||||
cfg.CONF.set_override('nsx_controllers', ['11.9.8.7', '11.9.8.77'])
|
||||
|
||||
cfg.CONF.set_override('nsx_user', NSX_USER, 'nsx_v3')
|
||||
cfg.CONF.set_override('nsx_password', NSX_PASSWORD, 'nsx_v3')
|
||||
@ -148,8 +152,17 @@ class NsxClientTestCase(NsxLibTestCase):
|
||||
return client_fn(*args, **kwargs)
|
||||
return _client
|
||||
|
||||
def _mock_client_init(*args, **kwargs):
|
||||
return with_client
|
||||
|
||||
fn_map = {}
|
||||
for fn in BRIDGE_FNS:
|
||||
fn_map[fn] = _call_client(fn)
|
||||
fn_map['NSX3Client'] = nsx_client.NSX3Client
|
||||
|
||||
fn_map['NSX3Client'] = _mock_client_init
|
||||
fn_map['JSONRESTClient'] = _mock_client_init
|
||||
fn_map['RESTClient'] = _mock_client_init
|
||||
|
||||
with_client.new_client_for = _mock_client_init
|
||||
|
||||
return cls.patch_client_module(in_module, fn_map)
|
||||
|
@ -55,6 +55,11 @@ class TestNsxV3L2GatewayDriver(test_l2gw_db.L2GWTestCase,
|
||||
self.l2gw_plugin = l2gw_plugin.NsxL2GatewayPlugin()
|
||||
self.context = context.get_admin_context()
|
||||
|
||||
def _get_nw_data(self):
|
||||
net_data = super(TestNsxV3L2GatewayDriver, self)._get_nw_data()
|
||||
net_data['network']['port_security_enabled'] = True
|
||||
return net_data
|
||||
|
||||
def test_nsxl2gw_driver_init(self):
|
||||
with mock.patch.object(nsx_v3_driver.NsxV3Driver,
|
||||
'_ensure_default_l2_gateway') as def_gw:
|
||||
|
Loading…
x
Reference in New Issue
Block a user