NSX|V3: Disallow port-security on port/net on ENS TZ

Raise an exception if a port or a network creation/update enables
port security on an ENS transport zone.

Change-Id: Ifbffec35c321d1ccf8c1aa00b4b3ed33140fb218
This commit is contained in:
Adit Sarfaty 2017-09-24 15:09:54 +03:00
parent 8703d956e8
commit bc5ceb6ac8
3 changed files with 198 additions and 34 deletions

View File

@ -206,3 +206,7 @@ class NsxVpnValidationError(NsxPluginException):
class NsxIPsecVpnMappingNotFound(n_exc.NotFound): class NsxIPsecVpnMappingNotFound(n_exc.NotFound):
message = _("Unable to find mapping for ipsec site connection: %(conn)s") message = _("Unable to find mapping for ipsec site connection: %(conn)s")
class NsxENSPortSecurity(n_exc.BadRequest):
message = _("Port security is not supported on ENS Transport zones")

View File

@ -667,7 +667,9 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
err_msg = None err_msg = None
net_type = network_data.get(pnet.NETWORK_TYPE) net_type = network_data.get(pnet.NETWORK_TYPE)
tz_type = self.nsxlib.transport_zone.TRANSPORT_TYPE_VLAN nsxlib_tz = self.nsxlib.transport_zone
tz_type = nsxlib_tz.TRANSPORT_TYPE_VLAN
switch_mode = nsxlib_tz.HOST_SWITCH_MODE_STANDARD
if validators.is_attr_set(net_type): if validators.is_attr_set(net_type):
if net_type == utils.NsxV3NetworkTypes.FLAT: if net_type == utils.NsxV3NetworkTypes.FLAT:
if vlan_id is not None: if vlan_id is not None:
@ -709,7 +711,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
err_msg = (_("Segmentation ID cannot be specified with " err_msg = (_("Segmentation ID cannot be specified with "
"%s network type") % "%s network type") %
utils.NsxV3NetworkTypes.GENEVE) utils.NsxV3NetworkTypes.GENEVE)
tz_type = self.nsxlib.transport_zone.TRANSPORT_TYPE_OVERLAY tz_type = nsxlib_tz.TRANSPORT_TYPE_OVERLAY
elif net_type == utils.NsxV3NetworkTypes.NSX_NETWORK: elif net_type == utils.NsxV3NetworkTypes.NSX_NETWORK:
# Linking neutron networks to an existing NSX logical switch # Linking neutron networks to an existing NSX logical switch
if physical_net is None: if physical_net is None:
@ -717,7 +719,9 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
"%s network type") % net_type) "%s network type") % net_type)
# Validate the logical switch existence # Validate the logical switch existence
try: try:
self.nsxlib.logical_switch.get(physical_net) nsx_net = self.nsxlib.logical_switch.get(physical_net)
switch_mode = nsxlib_tz.get_host_switch_mode(
nsx_net['transport_zone_id'])
except nsx_lib_exc.ResourceNotFound: except nsx_lib_exc.ResourceNotFound:
err_msg = (_('Logical switch %s does not exist') % err_msg = (_('Logical switch %s does not exist') %
physical_net) physical_net)
@ -747,10 +751,11 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
physical_net = az._default_overlay_tz_uuid physical_net = az._default_overlay_tz_uuid
# validate the transport zone existence and type # validate the transport zone existence and type
if (not err_msg and is_provider_net and physical_net and if (not err_msg and physical_net and
net_type != utils.NsxV3NetworkTypes.NSX_NETWORK): net_type != utils.NsxV3NetworkTypes.NSX_NETWORK):
if is_provider_net:
try: try:
backend_type = self.nsxlib.transport_zone.get_transport_type( backend_type = nsxlib_tz.get_transport_type(
physical_net) physical_net)
except nsx_lib_exc.ResourceNotFound: except nsx_lib_exc.ResourceNotFound:
err_msg = (_('Transport zone %s does not exist') % err_msg = (_('Transport zone %s does not exist') %
@ -760,11 +765,17 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
err_msg = (_('%(tz)s transport zone is required for ' err_msg = (_('%(tz)s transport zone is required for '
'creating a %(net)s provider network') % 'creating a %(net)s provider network') %
{'tz': tz_type, 'net': net_type}) {'tz': tz_type, 'net': net_type})
if not err_msg:
switch_mode = nsxlib_tz.get_host_switch_mode(physical_net)
if err_msg: if err_msg:
raise n_exc.InvalidInput(error_message=err_msg) raise n_exc.InvalidInput(error_message=err_msg)
return is_provider_net, net_type, physical_net, vlan_id return {'is_provider_net': is_provider_net,
'net_type': net_type,
'physical_net': physical_net,
'vlan_id': vlan_id,
'switch_mode': switch_mode}
def _get_edge_cluster(self, tier0_uuid): def _get_edge_cluster(self, tier0_uuid):
self._routerlib.validate_tier0(self.tier0_groups_dict, tier0_uuid) self._routerlib.validate_tier0(self.tier0_groups_dict, tier0_uuid)
@ -780,12 +791,19 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
return (True, utils.NetworkTypes.L3_EXT, tier0_uuid, 0) return (True, utils.NetworkTypes.L3_EXT, tier0_uuid, 0)
def _create_network_at_the_backend(self, context, net_data, az): def _create_network_at_the_backend(self, context, net_data, az):
is_provider_net, net_type, physical_net, vlan_id = ( provider_data = self._validate_provider_create(context, net_data, az)
self._validate_provider_create(context, net_data, az))
if is_provider_net and net_type == utils.NsxV3NetworkTypes.NSX_NETWORK: if (provider_data['switch_mode'] ==
self.nsxlib.transport_zone.HOST_SWITCH_MODE_ENS):
if net_data.get(psec.PORTSECURITY):
raise nsx_exc.NsxENSPortSecurity()
# set the default port security to False
net_data[psec.PORTSECURITY] = False
if (provider_data['is_provider_net'] and
provider_data['net_type'] == utils.NsxV3NetworkTypes.NSX_NETWORK):
# Network already exists on the NSX backend # Network already exists on the NSX backend
nsx_id = physical_net nsx_id = provider_data['physical_net']
else: else:
# Create network on the backend # Create network on the backend
neutron_net_id = net_data.get('id') or uuidutils.generate_uuid() neutron_net_id = net_data.get('id') or uuidutils.generate_uuid()
@ -802,21 +820,21 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
LOG.debug('create_network: %(net_name)s, %(physical_net)s, ' LOG.debug('create_network: %(net_name)s, %(physical_net)s, '
'%(tags)s, %(admin_state)s, %(vlan_id)s', '%(tags)s, %(admin_state)s, %(vlan_id)s',
{'net_name': net_name, {'net_name': net_name,
'physical_net': physical_net, 'physical_net': provider_data['physical_net'],
'tags': tags, 'tags': tags,
'admin_state': admin_state, 'admin_state': admin_state,
'vlan_id': vlan_id}) 'vlan_id': provider_data['vlan_id']})
nsx_result = self.nsxlib.logical_switch.create( nsx_result = self.nsxlib.logical_switch.create(
net_name, physical_net, tags, net_name, provider_data['physical_net'], tags,
admin_state=admin_state, admin_state=admin_state,
vlan_id=vlan_id, vlan_id=provider_data['vlan_id'],
description=net_data.get('description')) description=net_data.get('description'))
nsx_id = nsx_result['id'] nsx_id = nsx_result['id']
return (is_provider_net, return (provider_data['is_provider_net'],
net_type, provider_data['net_type'],
physical_net, provider_data['physical_net'],
vlan_id, provider_data['vlan_id'],
nsx_id) nsx_id)
def _is_overlay_network(self, context, network_id): def _is_overlay_network(self, context, network_id):
@ -1100,9 +1118,14 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
network) network)
self._extension_manager.process_update_network(context, net_data, self._extension_manager.process_update_network(context, net_data,
updated_net) updated_net)
if psec.PORTSECURITY in network['network']: if psec.PORTSECURITY in net_data:
# do not allow to enable port security on ENS networks
if (net_data[psec.PORTSECURITY] and
not original_net[psec.PORTSECURITY] and
self._is_ens_tz_net(context, id)):
raise nsx_exc.NsxENSPortSecurity()
self._process_network_port_security_update( self._process_network_port_security_update(
context, network['network'], updated_net) context, net_data, updated_net)
self._process_l3_update(context, updated_net, network['network']) self._process_l3_update(context, updated_net, network['network'])
self._extend_network_dict_provider(context, updated_net) self._extend_network_dict_provider(context, updated_net)
@ -1829,13 +1852,36 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
port_data.get(provider_sg.PROVIDER_SECURITYGROUPS) != []) port_data.get(provider_sg.PROVIDER_SECURITYGROUPS) != [])
return provider_sgs_specified return provider_sgs_specified
def _is_ens_tz_net(self, context, net_id):
#Check the host-switch-mode of the TZ connected to network
mappings = nsx_db.get_nsx_switch_ids(context.session, net_id)
if mappings:
nsx_net_id = nsx_net_id = mappings[0]
if nsx_net_id:
nsx_net = self.nsxlib.logical_switch.get(nsx_net_id)
if nsx_net and nsx_net.get('transport_zone_id'):
# Check the mode of this TZ
mode = self.nsxlib.transport_zone.get_host_switch_mode(
nsx_net['transport_zone_id'])
return (mode ==
self.nsxlib.transport_zone.HOST_SWITCH_MODE_ENS)
return False
def _is_ens_tz_port(self, context, port_data):
# Check the host-switch-mode of the TZ connected to the ports network
return self._is_ens_tz_net(context, port_data['network_id'])
def _create_port_preprocess_security( def _create_port_preprocess_security(
self, context, port, port_data, neutron_db): self, context, port, port_data, neutron_db):
(port_security, has_ip) = self._determine_port_security_and_has_ip( (port_security, has_ip) = self._determine_port_security_and_has_ip(
context, port_data) context, port_data)
port_data[psec.PORTSECURITY] = port_security port_data[psec.PORTSECURITY] = port_security
# No port security is allowed if the port belongs to an ENS TZ
if port_security and self._is_ens_tz_port(context, port_data):
raise nsx_exc.NsxENSPortSecurity()
self._process_port_port_security_create( self._process_port_port_security_create(
context, port_data, neutron_db) context, port_data, neutron_db)
# allowed address pair checks # allowed address pair checks
address_pairs = port_data.get(addr_pair.ADDRESS_PAIRS) address_pairs = port_data.get(addr_pair.ADDRESS_PAIRS)
if validators.is_attr_set(address_pairs): if validators.is_attr_set(address_pairs):
@ -2504,6 +2550,12 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
context, updated_port, context, updated_port,
updated_port[addr_pair.ADDRESS_PAIRS]) updated_port[addr_pair.ADDRESS_PAIRS])
# No port security is allowed if the port belongs to an ENS TZ
if (updated_port[psec.PORTSECURITY] and
psec.PORTSECURITY in port_data and
self._is_ens_tz_port(context, updated_port)):
raise nsx_exc.NsxENSPortSecurity()
# checks if security groups were updated adding/modifying # checks if security groups were updated adding/modifying
# security groups, port security is set and port has ip # security groups, port security is set and port has ip
provider_sgs_specified = self._provider_sgs_specified(updated_port) provider_sgs_specified = self._provider_sgs_specified(updated_port)
@ -3435,10 +3487,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _validate_multiple_subnets_routers(self, context, router_id, net_id): def _validate_multiple_subnets_routers(self, context, router_id, net_id):
network = self.get_network(context, net_id) network = self.get_network(context, net_id)
net_type = network.get(pnet.NETWORK_TYPE) net_type = network.get(pnet.NETWORK_TYPE)
if (net_type and if (net_type and not self._is_overlay_network(context, net_id)):
not self.nsxlib.feature_supported(
nsxlib_consts.FEATURE_VLAN_ROUTER_INTERFACE) and
not self._is_overlay_network(context, net_id)):
err_msg = (_("Only overlay networks can be attached to a logical " err_msg = (_("Only overlay networks can be attached to a logical "
"router. Network %(net_id)s is a %(net_type)s based " "router. Network %(net_id)s is a %(net_type)s based "
"network") % {'net_id': net_id, 'net_type': net_type}) "network") % {'net_id': net_id, 'net_type': net_type})

View File

@ -38,6 +38,7 @@ from neutron.tests.unit.scheduler \
import test_dhcp_agent_scheduler as test_dhcpagent import test_dhcp_agent_scheduler as test_dhcpagent
from neutron_lib.api.definitions import address_scope as addr_apidef from neutron_lib.api.definitions import address_scope as addr_apidef
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api.definitions import portbindings from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import provider_net as pnet from neutron_lib.api.definitions import provider_net as pnet
from neutron_lib.callbacks import exceptions as nc_exc from neutron_lib.callbacks import exceptions as nc_exc
@ -453,6 +454,60 @@ class TestNetworksV2(test_plugin.TestNetworksV2, NsxV3PluginTestCaseMixin):
# should fail # should fail
self.assertEqual('InvalidInput', data['NeutronError']['type']) self.assertEqual('InvalidInput', data['NeutronError']['type'])
def test_create_ens_network_with_no_port_sec(self):
providernet_args = {psec.PORTSECURITY: False}
with mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone."
"get_host_switch_mode", return_value="ENS"),\
mock.patch(
"vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get",
return_value={'transport_zone_id': 'xxx'}):
result = self._create_network(fmt='json', name='ens_net',
admin_state_up=True,
providernet_args=providernet_args,
arg_list=(psec.PORTSECURITY,))
res = self.deserialize('json', result)
# should succeed, and net should have port security disabled
self.assertFalse(res['network']['port_security_enabled'])
def test_create_ens_network_with_port_sec(self):
providernet_args = {psec.PORTSECURITY: True}
with mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone."
"get_host_switch_mode", return_value="ENS"),\
mock.patch(
"vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get",
return_value={'transport_zone_id': 'xxx'}):
result = self._create_network(fmt='json', name='ens_net',
admin_state_up=True,
providernet_args=providernet_args,
arg_list=(psec.PORTSECURITY,))
res = self.deserialize('json', result)
# should fail
self.assertEqual('NsxENSPortSecurity',
res['NeutronError']['type'])
def test_update_ens_network(self):
providernet_args = {psec.PORTSECURITY: False}
with mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone."
"get_host_switch_mode", return_value="ENS"),\
mock.patch(
"vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get",
return_value={'transport_zone_id': 'xxx'}):
result = self._create_network(fmt='json', name='ens_net',
admin_state_up=True,
providernet_args=providernet_args,
arg_list=(psec.PORTSECURITY,))
net = self.deserialize('json', result)
net_id = net['network']['id']
args = {'network': {psec.PORTSECURITY: True}}
req = self.new_update_request('networks', args,
net_id, fmt='json')
res = self.deserialize('json', req.get_response(self.api))
# should fail
self.assertEqual('NsxENSPortSecurity',
res['NeutronError']['type'])
class TestSubnetsV2(test_plugin.TestSubnetsV2, NsxV3PluginTestCaseMixin): class TestSubnetsV2(test_plugin.TestSubnetsV2, NsxV3PluginTestCaseMixin):
@ -773,6 +828,62 @@ class TestPortsV2(test_plugin.TestPortsV2, NsxV3PluginTestCaseMixin,
# configured one # configured one
self.assertEqual(expected_prof, actual_profs[0]) self.assertEqual(expected_prof, actual_profs[0])
def test_create_ens_port_with_no_port_sec(self):
with self.subnet() as subnet,\
mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone."
"get_host_switch_mode", return_value="ENS"),\
mock.patch(
"vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get",
return_value={'transport_zone_id': 'xxx'}):
args = {'port': {'network_id': subnet['subnet']['network_id'],
'tenant_id': subnet['subnet']['tenant_id'],
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']}],
psec.PORTSECURITY: False}}
port_req = self.new_create_request('ports', args)
port = self.deserialize(self.fmt, port_req.get_response(self.api))
self.assertFalse(port['port']['port_security_enabled'])
def test_create_ens_port_with_port_sec(self):
with self.subnet() as subnet,\
mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone."
"get_host_switch_mode", return_value="ENS"),\
mock.patch(
"vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get",
return_value={'transport_zone_id': 'xxx'}):
args = {'port': {'network_id': subnet['subnet']['network_id'],
'tenant_id': subnet['subnet']['tenant_id'],
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']}],
psec.PORTSECURITY: True}}
port_req = self.new_create_request('ports', args)
res = self.deserialize('json', port_req.get_response(self.api))
# should fail
self.assertEqual('NsxENSPortSecurity',
res['NeutronError']['type'])
def test_update_ens_port(self):
with self.subnet() as subnet,\
mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone."
"get_host_switch_mode", return_value="ENS"),\
mock.patch(
"vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get",
return_value={'transport_zone_id': 'xxx'}):
args = {'port': {'network_id': subnet['subnet']['network_id'],
'tenant_id': subnet['subnet']['tenant_id'],
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']}],
psec.PORTSECURITY: False}}
port_req = self.new_create_request('ports', args)
port = self.deserialize(self.fmt, port_req.get_response(self.api))
port_id = port['port']['id']
args = {'port': {psec.PORTSECURITY: True}}
req = self.new_update_request('ports', args, port_id)
res = self.deserialize('json', req.get_response(self.api))
# should fail
self.assertEqual('NsxENSPortSecurity',
res['NeutronError']['type'])
def test_update_port_update_ip_address_only(self): def test_update_port_update_ip_address_only(self):
self.skipTest('Multiple fixed ips on a port are not supported') self.skipTest('Multiple fixed ips on a port are not supported')