Merge "NSXv: Implicitly disable port-security for direct vnic-type ports"

This commit is contained in:
Jenkins 2017-08-24 16:34:13 +00:00 committed by Gerrit Code Review
commit 48601bf047
2 changed files with 105 additions and 70 deletions

View File

@ -1666,33 +1666,26 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
context, port['port'], created_port)
return created_port
def _process_vnic_type(self, context, port_data, attrs,
has_security_groups, port_security):
vnic_type = attrs and attrs.get(pbin.VNIC_TYPE)
if attrs and validators.is_attr_set(vnic_type):
if vnic_type == pbin.VNIC_NORMAL:
pass
elif vnic_type in [pbin.VNIC_DIRECT,
pbin.VNIC_DIRECT_PHYSICAL]:
if has_security_groups or port_security:
err_msg = _("Direct/direct-physical VNIC type requires "
"no port security and no security groups!")
raise n_exc.InvalidInput(error_message=err_msg)
if not self._validate_network_type(
context, port_data['network_id'],
[c_utils.NsxVNetworkTypes.VLAN,
c_utils.NsxVNetworkTypes.FLAT,
c_utils.NsxVNetworkTypes.PORTGROUP]):
err_msg = _("Direct VNIC type requires VLAN, Flat or "
"Portgroup network!")
raise n_exc.InvalidInput(error_message=err_msg)
else:
err_msg = _("Only direct or normal VNIC types supported")
def _validate_port_direct_vnic_type(self, context, port_data):
vnic_type = port_data.get(pbin.VNIC_TYPE)
has_vnic_type = validators.is_attr_set(vnic_type)
if has_vnic_type and vnic_type in [pbin.VNIC_DIRECT,
pbin.VNIC_DIRECT_PHYSICAL]:
if not self._validate_network_type(
context, port_data['network_id'],
[c_utils.NsxVNetworkTypes.VLAN,
c_utils.NsxVNetworkTypes.FLAT,
c_utils.NsxVNetworkTypes.PORTGROUP]):
err_msg = _("'%s' vnic-type is only supported"
"for networks of type 'vlan', 'flat' or "
"'portgroup'.") % vnic_type
raise n_exc.InvalidInput(error_message=err_msg)
nsxv_db.update_nsxv_port_ext_attributes(
session=context.session,
port_id=port_data['id'],
vnic_type=vnic_type)
return vnic_type
elif has_vnic_type and vnic_type != pbin.VNIC_NORMAL:
err_msg = _("Invalid vnic-type %s."
"Supported vnic-types are 'normal', 'direct' and "
"'direct-physical'.") % vnic_type
raise n_exc.InvalidInput(error_message=err_msg)
def _validate_extra_dhcp_options(self, opts):
if not opts:
@ -1732,30 +1725,42 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
neutron_db = super(NsxVPluginV2, self).create_port(context, port)
self._extension_manager.process_create_port(
context, port_data, neutron_db)
# Port port-security is decided by the port-security state on the
# network it belongs to, unless specifically specified here
if validators.is_attr_set(port_data.get(psec.PORTSECURITY)):
port_security = port_data[psec.PORTSECURITY]
direct_vnic_type = self._validate_port_direct_vnic_type(context,
port_data)
# Port port-security is decided based on port's vnic_type and ports
# network port-security state (unless explicitly requested
# differently by the user).
port_security = port_data.get(psec.PORTSECURITY)
if validators.is_attr_set(port_security):
# 'direct' and 'direct-physical' vnic types ports requires
# port-security to be disabled.
if direct_vnic_type and port_security:
err_msg = _("Security features are not supported for "
"ports with direct/direct-physical VNIC type.")
raise n_exc.InvalidInput(error_message=err_msg)
elif direct_vnic_type:
# Implicitly disable port-security for direct vnic types.
port_security = False
else:
port_security = self._get_network_security_binding(
context, neutron_db['network_id'])
port_data[psec.PORTSECURITY] = port_security
port_data[psec.PORTSECURITY] = port_security
self._process_port_port_security_create(
context, port_data, neutron_db)
# Update fields obtained from neutron db (eg: MAC address)
port["port"].update(neutron_db)
has_ip = self._ip_on_port(neutron_db)
provider_sg_specified = (validators.is_attr_set(
port_data.get(provider_sg.PROVIDER_SECURITYGROUPS))
and port_data[provider_sg.PROVIDER_SECURITYGROUPS] != [])
if provider_sg_specified and not port_security:
err_msg = _("Can't disable port security when there are "
"provider rules")
raise n_exc.InvalidInput(error_message=err_msg)
has_security_groups = (
self._check_update_has_security_groups(port))
self._process_port_port_security_create(
context, port_data, neutron_db)
# Update fields obtained from neutron db (eg: MAC address)
port["port"].update(neutron_db)
has_ip = self._ip_on_port(neutron_db)
# allowed address pair checks
attrs = port[port_def.RESOURCE_NAME]
if self._check_update_has_allowed_address_pairs(port):
@ -1781,6 +1786,12 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
port_data,
ssgids)
if direct_vnic_type:
nsxv_db.update_nsxv_port_ext_attributes(
session=context.session,
port_id=port_data['id'],
vnic_type=direct_vnic_type)
self._process_portbindings_create_and_update(context,
port['port'],
port_data)
@ -1789,9 +1800,6 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
context, neutron_db,
attrs.get(addr_pair.ADDRESS_PAIRS)))
self._process_vnic_type(context, port_data, attrs,
has_security_groups,
port_security)
self._process_port_create_extra_dhcp_opts(
context, port_data, dhcp_opts)
@ -2016,6 +2024,19 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
ret_port.update(port['port'])
has_ip = self._ip_on_port(ret_port)
direct_vnic_type = self._validate_port_direct_vnic_type(context,
ret_port)
if direct_vnic_type and has_port_security:
err_msg = _("Security features are not supported for "
"ports with direct/direct-physical VNIC type.")
raise n_exc.InvalidInput(error_message=err_msg)
if direct_vnic_type:
nsxv_db.update_nsxv_port_ext_attributes(
session=context.session,
port_id=ret_port['id'],
vnic_type=direct_vnic_type)
# checks that if update adds/modify security groups,
# then port has ip and port-security
if not (has_ip and has_port_security):
@ -2045,9 +2066,6 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
update_assigned_addresses = self.update_address_pairs_on_port(
context, id, port, original_port, ret_port)
self._process_vnic_type(context, ret_port, attrs,
has_security_groups,
has_port_security)
self._update_extra_dhcp_opts_on_port(context, id, port,
ret_port)

View File

@ -43,6 +43,7 @@ import neutron.tests.unit.extensions.test_securitygroup as ext_sg
from neutron.tests.unit import testlib_api
from neutron_lib.api.definitions import address_scope as addr_apidef
from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import provider_net as pnet
from neutron_lib.api import validators
@ -1200,6 +1201,17 @@ class TestPortsV2(NsxVPluginV2TestCase,
portbindings.VNIC_TYPE,
self.vnic_type))
def test_port_invalid_vnic_type(self):
with self._test_create_direct_network(vlan_id=7) as network:
kwargs = {portbindings.VNIC_TYPE: 'invalid',
psec.PORTSECURITY: False}
net_id = network['network']['id']
res = self._create_port(self.fmt, net_id=net_id,
arg_list=(portbindings.VNIC_TYPE,
psec.PORTSECURITY),
**kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_range_allocation(self):
self.skipTest('Multiple fixed ips on a port are not supported')
@ -1524,46 +1536,46 @@ class TestPortsV2(NsxVPluginV2TestCase,
def test_create_port_vnic_direct(self):
with self._test_create_direct_network(vlan_id=7) as network:
# Check that port security conflicts
kwargs = {'binding:vnic_type': 'direct'}
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
psec.PORTSECURITY: True}
net_id = network['network']['id']
res = self._create_port(self.fmt, net_id=net_id,
arg_list=(portbindings.VNIC_TYPE,),
arg_list=(portbindings.VNIC_TYPE,
psec.PORTSECURITY),
**kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
# Check that security group conflicts
kwargs = {'binding:vnic_type': 'direct',
'security_groups':
['4cd70774-cc67-4a87-9b39-7d1db38eb087'],
'port_security_enabled': False}
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
'security_groups': [
'4cd70774-cc67-4a87-9b39-7d1db38eb087'],
psec.PORTSECURITY: False}
net_id = network['network']['id']
res = self._create_port(self.fmt, net_id=net_id,
arg_list=(portbindings.VNIC_TYPE,
'port_security_enabled'),
psec.PORTSECURITY),
**kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
# All is kosher so we can create the port
kwargs = {'binding:vnic_type': 'direct',
'port_security_enabled': False}
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT}
net_id = network['network']['id']
res = self._create_port(self.fmt, net_id=net_id,
arg_list=(portbindings.VNIC_TYPE,
'port_security_enabled'),
arg_list=(portbindings.VNIC_TYPE,),
**kwargs)
port = self.deserialize('json', res)
self.assertEqual("direct", port['port']['binding:vnic_type'])
self.assertEqual("direct", port['port'][portbindings.VNIC_TYPE])
def test_create_port_vnic_direct_invalid_network(self):
with self.network(name='not vlan/flat') as net:
kwargs = {'binding:vnic_type': 'direct',
'port_security_enabled': False}
kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT,
psec.PORTSECURITY: False}
net_id = net['network']['id']
res = self._create_port(self.fmt, net_id=net_id,
arg_list=(portbindings.VNIC_TYPE,
'port_security_enabled'),
psec.PORTSECURITY),
**kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_update_vnic_direct(self):
with self._test_create_direct_network(vlan_id=7) as network:
@ -1571,19 +1583,22 @@ class TestPortsV2(NsxVPluginV2TestCase,
with self.port(subnet=subnet) as port:
# need to do two updates as the update for port security
# disabled requires that it can only change 2 items
data = {'port': {'port_security_enabled': False,
data = {'port': {psec.PORTSECURITY: False,
'security_groups': []}}
req = self.new_update_request('ports',
data, port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual('normal',
res['port']['binding:vnic_type'])
data = {'port': {'binding:vnic_type': 'direct'}}
self.assertEqual(portbindings.VNIC_NORMAL,
res['port'][portbindings.VNIC_TYPE])
data = {'port': {portbindings.VNIC_TYPE:
portbindings.VNIC_DIRECT}}
req = self.new_update_request('ports',
data, port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual('direct',
res['port']['binding:vnic_type'])
self.assertEqual(portbindings.VNIC_DIRECT,
res['port'][portbindings.VNIC_TYPE])
def test_delete_network_port_exists_owned_by_network_port_not_found(self):
"""Tests that we continue to gracefully delete the network even if
@ -3711,8 +3726,10 @@ class TestExclusiveRouterTestCase(L3NatTest, L3NatTestCaseBase,
# change address scope of the first subnetpool
with self.address_scope(name='as2') as addr_scope2,\
mock.patch.object(edge_utils, 'update_nat_rules') as update_nat,\
mock.patch.object(edge_utils, 'update_firewall') as update_fw:
mock.patch.object(edge_utils,
'update_nat_rules') as update_nat,\
mock.patch.object(edge_utils,
'update_firewall') as update_fw:
as2_id = addr_scope2['address_scope']['id']
data = {'subnetpool': {