NSX|V3: Refactor network & port validations

refactor the validation code for network & port create & update operations
and moce those to the common plugin code.
This will be used later by the policy plugin code.

Change-Id: Ia461851022a20f07cb50d05dc73cc37f48752164
This commit is contained in:
Adit Sarfaty 2018-09-05 11:40:11 +03:00
parent 48481fdd6d
commit 7ebfb1062e
3 changed files with 214 additions and 158 deletions

View File

@ -142,6 +142,10 @@ class InvalidIPAddress(n_exc.InvalidInput):
message = _("'%(ip_address)s' must be a /32 CIDR based IPv4 address")
class QoSOnExternalNet(n_exc.InvalidInput):
message = _("Cannot configure QOS on external networks")
class SecurityGroupMaximumCapacityReached(NsxPluginException):
pass

View File

@ -24,8 +24,10 @@ from neutron.db import l3_db
from neutron.db import models_v2
from neutron_lib.api.definitions import address_scope as ext_address_scope
from neutron_lib.api.definitions import availability_zone as az_def
from neutron_lib.api.definitions import external_net as extnet_apidef
from neutron_lib.api.definitions import network as net_def
from neutron_lib.api.definitions import port as port_def
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api.definitions import subnet as subnet_def
from neutron_lib.api import validators
from neutron_lib.api.validators import availability_zone as az_validator
@ -36,11 +38,14 @@ from neutron_lib import constants
from neutron_lib import context as n_context
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import directory
from neutron_lib.services.qos import constants as qos_consts
from neutron_lib.utils import net
from vmware_nsx._i18n import _
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.extensions import maclearning as mac_ext
from vmware_nsx.services.qos.common import utils as qos_com_utils
from vmware_nsx.services.vpnaas.nsxv3 import ipsec_utils
LOG = logging.getLogger(__name__)
@ -385,6 +390,168 @@ class NsxPluginBase(db_base_plugin_v2.NeutronDbPluginV2,
if qos_policy_id:
qos_com_utils.validate_policy_accessable(context, qos_policy_id)
def _validate_create_network(self, context, net_data):
"""Validate the parameters of the new network to be created
This method includes general validations that does not depend on
provider attributes, or plugin specific configurations
"""
external = net_data.get(extnet_apidef.EXTERNAL)
is_external_net = validators.is_attr_set(external) and external
with_qos = validators.is_attr_set(
net_data.get(qos_consts.QOS_POLICY_ID))
if with_qos:
self._validate_qos_policy_id(
context, net_data.get(qos_consts.QOS_POLICY_ID))
if is_external_net:
raise nsx_exc.QoSOnExternalNet()
def _validate_update_netowrk(self, context, id, original_net, net_data):
"""Validate the updated parameters of a network
This method includes general validations that does not depend on
provider attributes, or plugin specific configurations
"""
extern_net = self._network_is_external(context, id)
with_qos = validators.is_attr_set(
net_data.get(qos_consts.QOS_POLICY_ID))
# Do not allow QoS on external networks
if with_qos and extern_net:
raise nsx_exc.QoSOnExternalNet()
# Do not support changing external/non-external networks
if (extnet_apidef.EXTERNAL in net_data and
net_data[extnet_apidef.EXTERNAL] != extern_net):
err_msg = _("Cannot change the router:external flag of a network")
raise n_exc.InvalidInput(error_message=err_msg)
def _assert_on_illegal_port_with_qos(self, device_owner):
# Prevent creating/update port with QoS policy
# on router-interface/network-dhcp ports.
if ((device_owner == l3_db.DEVICE_OWNER_ROUTER_INTF or
device_owner == constants.DEVICE_OWNER_DHCP)):
err_msg = _("Unable to create or update %s port with a QoS "
"policy") % device_owner
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
def _assert_on_external_net_with_compute(self, port_data):
# Prevent creating port with device owner prefix 'compute'
# on external networks.
device_owner = port_data.get('device_owner')
if (device_owner is not None and
device_owner.startswith(constants.DEVICE_OWNER_COMPUTE_PREFIX)):
err_msg = _("Unable to update/create a port with an external "
"network")
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
def _validate_create_port(self, context, port_data):
self._validate_max_ips_per_port(port_data.get('fixed_ips', []),
port_data.get('device_owner'))
is_external_net = self._network_is_external(
context, port_data['network_id'])
qos_selected = validators.is_attr_set(port_data.get(
qos_consts.QOS_POLICY_ID))
device_owner = port_data.get('device_owner')
# QoS validations
if qos_selected:
self._validate_qos_policy_id(
context, port_data.get(qos_consts.QOS_POLICY_ID))
self._assert_on_illegal_port_with_qos(device_owner)
if is_external_net:
raise nsx_exc.QoSOnExternalNet()
# External network validations:
if is_external_net:
self._assert_on_external_net_with_compute(port_data)
self._assert_on_port_admin_state(port_data, device_owner)
def _assert_on_vpn_port_change(self, port_data):
if port_data['device_owner'] == ipsec_utils.VPN_PORT_OWNER:
msg = _('Can not update/delete VPNaaS port %s') % port_data['id']
raise n_exc.InvalidInput(error_message=msg)
def _assert_on_device_owner_change(self, port_data, orig_dev_own):
"""Prevent illegal device owner modifications
"""
if 'device_owner' not in port_data:
return
new_dev_own = port_data['device_owner']
if new_dev_own == orig_dev_own:
return
err_msg = (_("Changing port device owner '%(orig)s' to '%(new)s' is "
"not allowed") % {'orig': orig_dev_own,
'new': new_dev_own})
# Do not allow changing nova <-> neutron device owners
if ((orig_dev_own.startswith(constants.DEVICE_OWNER_COMPUTE_PREFIX) and
new_dev_own.startswith(constants.DEVICE_OWNER_NETWORK_PREFIX)) or
(orig_dev_own.startswith(constants.DEVICE_OWNER_NETWORK_PREFIX) and
new_dev_own.startswith(constants.DEVICE_OWNER_COMPUTE_PREFIX))):
raise n_exc.InvalidInput(error_message=err_msg)
# Do not allow removing the device owner in some cases
if orig_dev_own == constants.DEVICE_OWNER_DHCP:
raise n_exc.InvalidInput(error_message=err_msg)
def _assert_on_port_sec_change(self, port_data, device_owner):
"""Do not allow enabling port security/mac learning of some ports
Trusted ports are created with port security and mac learning disabled
in neutron, and it should not change.
"""
if net.is_port_trusted({'device_owner': device_owner}):
if port_data.get(psec.PORTSECURITY) is True:
err_msg = _("port_security_enabled=True is not supported for "
"trusted ports")
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
mac_learning = port_data.get(mac_ext.MAC_LEARNING)
if (validators.is_attr_set(mac_learning) and mac_learning is True):
err_msg = _("mac_learning_enabled=True is not supported for "
"trusted ports")
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
def _validate_update_port(self, context, id, original_port, port_data):
qos_selected = validators.is_attr_set(port_data.get
(qos_consts.QOS_POLICY_ID))
is_external_net = self._network_is_external(
context, original_port['network_id'])
device_owner = (port_data['device_owner']
if 'device_owner' in port_data
else original_port.get('device_owner'))
# QoS validations
if qos_selected:
self._validate_qos_policy_id(
context, port_data.get(qos_consts.QOS_POLICY_ID))
if is_external_net:
raise nsx_exc.QoSOnExternalNet()
self._assert_on_illegal_port_with_qos(device_owner)
# External networks validations:
if is_external_net:
self._assert_on_external_net_with_compute(port_data)
# Device owner validations:
self._assert_on_device_owner_change(
port_data, original_port.get('device_owner'))
self._assert_on_port_admin_state(port_data, device_owner)
self._assert_on_port_sec_change(port_data, device_owner)
self._validate_max_ips_per_port(
port_data.get('fixed_ips', []), device_owner)
self._assert_on_vpn_port_change(original_port)
def get_housekeeper(self, context, name, fields=None):
# run the job in readonly mode and get the results
self.housekeeper.run(context, name, readonly=True)

View File

@ -76,7 +76,6 @@ from neutron_lib import context as q_context
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import utils as plugin_utils
from neutron_lib.utils import helpers
from neutron_lib.utils import net as nlib_net
from oslo_config import cfg
from oslo_context import context as context_utils
from oslo_db import exception as db_exc
@ -122,7 +121,6 @@ from vmware_nsx.services.lbaas.nsx_v3.v2 import lb_driver_v2
from vmware_nsx.services.qos.common import utils as qos_com_utils
from vmware_nsx.services.qos.nsx_v3 import driver as qos_driver
from vmware_nsx.services.trunk.nsx_v3 import driver as trunk_driver
from vmware_nsx.services.vpnaas.nsxv3 import ipsec_utils
from vmware_nsxlib.v3 import core_resources as nsx_resources
from vmware_nsxlib.v3 import exceptions as nsx_lib_exc
from vmware_nsxlib.v3 import nsx_constants as nsxlib_consts
@ -1166,12 +1164,6 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
network[pnet.PHYSICAL_NETWORK] = bindings[0].phy_uuid
network[pnet.SEGMENTATION_ID] = bindings[0].vlan_id
def _assert_on_external_net_with_qos(self, net_data):
# Prevent creating/update external network with QoS policy
if validators.is_attr_set(net_data.get(qos_consts.QOS_POLICY_ID)):
err_msg = _("Cannot configure QOS on external networks")
raise n_exc.InvalidInput(error_message=err_msg)
def get_subnets(self, context, filters=None, fields=None, sorts=None,
limit=None, marker=None, page_reverse=False):
filters = filters or {}
@ -1222,7 +1214,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def create_network(self, context, network):
net_data = network['network']
external = net_data.get(extnet_apidef.EXTERNAL)
is_backend_network = False
is_external_net = validators.is_attr_set(external) and external
is_ddi_network = False
tenant_id = net_data['tenant_id']
@ -1239,13 +1231,13 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
if extensions.is_extension_supported(self, 'vlan-transparent'):
vlt = vlan_apidef.get_vlan_transparent(net_data)
nsx_net_id = None
self._validate_qos_policy_id(
context, net_data.get(qos_consts.QOS_POLICY_ID))
if validators.is_attr_set(external) and external:
self._assert_on_external_net_with_qos(net_data)
self._validate_create_network(context, net_data)
if is_external_net:
is_provider_net, net_type, physical_net, vlan_id = (
self._validate_external_net_create(net_data, az))
nsx_net_id = None
is_backend_network = False
else:
is_provider_net, net_type, physical_net, vlan_id, nsx_net_id = (
self._create_network_at_the_backend(context, net_data, az,
@ -1464,16 +1456,12 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
extern_net = self._network_is_external(context, id)
is_nsx_net = self._network_is_nsx_net(context, id)
is_ens_net = self._is_ens_tz_net(context, id)
if extern_net:
self._assert_on_external_net_with_qos(net_data)
else:
if is_ens_net:
self._assert_on_ens_with_qos(net_data)
# Do not support changing external/non-external networks
if (extnet_apidef.EXTERNAL in net_data and
net_data[extnet_apidef.EXTERNAL] != extern_net):
err_msg = _("Cannot change the router:external flag of a network")
raise n_exc.InvalidInput(error_message=err_msg)
# Validate the updated parameters
self._validate_update_netowrk(context, id, original_net, net_data)
# add some plugin specific validations
if is_ens_net:
self._assert_on_ens_with_qos(net_data)
updated_net = super(NsxV3Plugin, self).update_network(context, id,
network)
@ -2455,17 +2443,6 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
self._get_security_groups_on_port(context, port))
return port_security, has_ip, sgids, psgids
def _assert_on_external_net_with_compute(self, port_data):
# Prevent creating port with device owner prefix 'compute'
# on external networks.
device_owner = port_data.get('device_owner')
if (device_owner is not None and
device_owner.startswith(const.DEVICE_OWNER_COMPUTE_PREFIX)):
err_msg = _("Unable to update/create a port with an external "
"network")
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
def _assert_on_dhcp_relay_without_router(self, context, port_data,
original_port=None):
# Prevent creating/updating port with device owner prefix 'compute'
@ -2517,50 +2494,6 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
if lport_id:
self.nsxlib.logical_port.delete(lport_id)
def _assert_on_external_net_port_with_qos(self, port_data):
# Prevent creating/update port with QoS policy
# on external networks.
if validators.is_attr_set(port_data.get(qos_consts.QOS_POLICY_ID)):
err_msg = _("Unable to update/create a port with an external "
"network and a QoS policy")
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
def _assert_on_illegal_port_with_qos(self, port_data, device_owner):
# Prevent creating/update port with QoS policy
# on router-interface/network-dhcp ports.
if ((device_owner == l3_db.DEVICE_OWNER_ROUTER_INTF or
device_owner == const.DEVICE_OWNER_DHCP) and
validators.is_attr_set(port_data.get(qos_consts.QOS_POLICY_ID))):
err_msg = _("Unable to create or update %s port with a QoS "
"policy") % device_owner
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
def _assert_on_device_owner_change(self, port_data, orig_dev_own):
"""Prevent illegal device owner modifications
"""
if 'device_owner' not in port_data:
return
new_dev_own = port_data['device_owner']
if new_dev_own == orig_dev_own:
return
err_msg = (_("Changing port device owner '%(orig)s' to '%(new)s' is "
"not allowed") % {'orig': orig_dev_own,
'new': new_dev_own})
# Do not allow changing nova <-> neutron device owners
if ((orig_dev_own.startswith(const.DEVICE_OWNER_COMPUTE_PREFIX) and
new_dev_own.startswith(const.DEVICE_OWNER_NETWORK_PREFIX)) or
(orig_dev_own.startswith(const.DEVICE_OWNER_NETWORK_PREFIX) and
new_dev_own.startswith(const.DEVICE_OWNER_COMPUTE_PREFIX))):
raise n_exc.InvalidInput(error_message=err_msg)
# Do not allow removing the device owner in some cases
if orig_dev_own == const.DEVICE_OWNER_DHCP:
raise n_exc.InvalidInput(error_message=err_msg)
def _assert_on_port_admin_state(self, port_data, device_owner):
"""Do not allow changing the admin state of some ports"""
if (device_owner == l3_db.DEVICE_OWNER_ROUTER_INTF or
@ -2571,31 +2504,6 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
def _assert_on_port_sec_change(self, port_data, device_owner):
"""Do not allow enabling port security/mac learning of some ports
Trusted ports are created with port security and mac learning disabled
in neutron, and it should not change.
"""
if nlib_net.is_port_trusted({'device_owner': device_owner}):
if port_data.get(psec.PORTSECURITY) is True:
err_msg = _("port_security_enabled=True is not supported for "
"trusted ports")
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
mac_learning = port_data.get(mac_ext.MAC_LEARNING)
if (validators.is_attr_set(mac_learning) and mac_learning is True):
err_msg = _("mac_learning_enabled=True is not supported for "
"trusted ports")
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
def _assert_on_vpn_port_change(self, port_data):
if port_data['device_owner'] == ipsec_utils.VPN_PORT_OWNER:
msg = _('Can not update/delete VPNaaS port %s') % port_data['id']
raise n_exc.InvalidInput(error_message=msg)
def _filter_ipv4_dhcp_fixed_ips(self, context, fixed_ips):
ips = []
for fixed_ip in fixed_ips:
@ -2926,45 +2834,40 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
return (nsx_constants.VIF_TYPE_DVS if direct_vnic_type
else pbin.VIF_TYPE_OVS)
def create_port(self, context, port, l2gw_port_check=False):
port_data = port['port']
dhcp_opts = port_data.get(ext_edo.EXTRADHCPOPTS)
self._validate_extra_dhcp_options(dhcp_opts)
self._validate_max_ips_per_port(port_data.get('fixed_ips', []),
port_data.get('device_owner'))
self._assert_on_dhcp_relay_without_router(context, port_data)
is_ens_tz_port = self._is_ens_tz_port(context, port_data)
def _validate_ens_create_port(self, context, port_data):
qos_selected = validators.is_attr_set(port_data.get(
qos_consts.QOS_POLICY_ID))
self._validate_qos_policy_id(
context, port_data.get(qos_consts.QOS_POLICY_ID))
if qos_selected:
err_msg = _("Cannot configure QOS on ENS networks")
raise n_exc.InvalidInput(error_message=err_msg)
if cfg.CONF.nsx_v3.disable_port_security_for_ens:
LOG.warning("Disabling port security for network %s",
port_data['network_id'])
port_data[psec.PORTSECURITY] = False
port_data['security_groups'] = []
def create_port(self, context, port, l2gw_port_check=False):
port_data = port['port']
# validate the new port parameters
self._validate_create_port(context, port_data)
# Add the plugin specific validations
dhcp_opts = port_data.get(ext_edo.EXTRADHCPOPTS)
self._validate_extra_dhcp_options(dhcp_opts)
self._assert_on_dhcp_relay_without_router(context, port_data)
is_ens_tz_port = self._is_ens_tz_port(context, port_data)
if is_ens_tz_port:
if qos_selected:
err_msg = _("Cannot configure QOS on ENS networks")
raise n_exc.InvalidInput(error_message=err_msg)
self._validate_ens_create_port(context, port_data)
if cfg.CONF.nsx_v3.disable_port_security_for_ens:
LOG.warning("Disabling port security for network %s",
port_data['network_id'])
port_data[psec.PORTSECURITY] = False
port_data['security_groups'] = []
is_external_net = self._network_is_external(
context, port_data['network_id'])
direct_vnic_type = self._validate_port_vnic_type(
context, port_data, port_data['network_id'],
projectpluginmap.NsxPlugins.NSX_T)
with db_api.context_manager.writer.using(context):
is_external_net = self._network_is_external(
context, port_data['network_id'])
if is_external_net:
self._assert_on_external_net_with_compute(port_data)
self._assert_on_external_net_port_with_qos(port_data)
self._assert_on_illegal_port_with_qos(
port_data, port_data.get('device_owner'))
self._assert_on_port_admin_state(
port_data, port_data.get('device_owner'))
neutron_db = self.base_create_port(context, port)
port["port"].update(neutron_db)
@ -3447,45 +3350,27 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
self._extend_get_port_dict_qos_and_binding(context, original_port)
self._remove_provider_security_groups_from_list(original_port)
port_data = port['port']
nsx_lswitch_id, nsx_lport_id = nsx_db.get_nsx_switch_and_port_id(
context.session, id)
self._validate_qos_policy_id(
context, port_data.get(qos_consts.QOS_POLICY_ID))
is_external_net = self._network_is_external(
context, original_port['network_id'])
if is_external_net:
self._assert_on_external_net_with_compute(port_data)
self._assert_on_external_net_port_with_qos(port_data)
# Validate the changes
self._validate_update_port(context, id, original_port, port_data)
self._assert_on_dhcp_relay_without_router(context, port_data,
original_port)
is_ens_tz_port = self._is_ens_tz_port(context, original_port)
qos_selected = validators.is_attr_set(port_data.get
(qos_consts.QOS_POLICY_ID))
if is_ens_tz_port:
if qos_selected:
err_msg = _("Cannot configure QOS on ENS networks")
raise n_exc.InvalidInput(error_message=err_msg)
if is_ens_tz_port and qos_selected:
err_msg = _("Cannot configure QOS on ENS networks")
raise n_exc.InvalidInput(error_message=err_msg)
dhcp_opts = port_data.get(ext_edo.EXTRADHCPOPTS)
self._validate_extra_dhcp_options(dhcp_opts)
device_owner = (port_data['device_owner']
if 'device_owner' in port_data
else original_port.get('device_owner'))
self._assert_on_device_owner_change(
port_data, original_port.get('device_owner'))
self._assert_on_illegal_port_with_qos(
port_data, device_owner)
self._assert_on_port_admin_state(port_data, device_owner)
self._assert_on_port_sec_change(port_data, device_owner)
self._validate_max_ips_per_port(
port_data.get('fixed_ips', []), device_owner)
self._assert_on_vpn_port_change(original_port)
direct_vnic_type = self._validate_port_vnic_type(
context, port_data, original_port['network_id'])
# Update the neutron port
updated_port = super(NsxV3Plugin, self).update_port(context,
id, port)
self._extension_manager.process_update_port(context, port_data,