DVS: Add plugin validations

1. Do not allow to enable port-security on a vlan port
2. Do not allow creation of an external network
3. Validate that the physical network exists for portgroup network creation
4. Use default dvs as physical network for vlan netowrk validation

Change-Id: I3b738d2990794f35776859d1fbe509036084ec3a
This commit is contained in:
Adit Sarfaty 2019-05-26 14:12:11 +03:00
parent 4b3d0e9446
commit 1f820c6811
2 changed files with 91 additions and 10 deletions

View File

@ -275,7 +275,10 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
segmentation_id = net_data.get(pnet.SEGMENTATION_ID) segmentation_id = net_data.get(pnet.SEGMENTATION_ID)
segmentation_id_set = validators.is_attr_set(segmentation_id) segmentation_id_set = validators.is_attr_set(segmentation_id)
physical_network = net_data.get(pnet.PHYSICAL_NETWORK) physical_network = net_data.get(pnet.PHYSICAL_NETWORK)
physical_network_set = validators.is_attr_set(physical_network)
if network_type == 'vlan': if network_type == 'vlan':
if not physical_network_set:
physical_network = dvs_utils.dvs_name_get()
bindings = nsx_db.get_network_bindings_by_vlanid_and_physical_net( bindings = nsx_db.get_network_bindings_by_vlanid_and_physical_net(
context.session, segmentation_id, physical_network) context.session, segmentation_id, physical_network)
if bindings: if bindings:
@ -286,6 +289,14 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
err_msg = _("Only an admin can create a DVS provider " err_msg = _("Only an admin can create a DVS provider "
"network") "network")
raise n_exc.InvalidInput(error_message=err_msg) raise n_exc.InvalidInput(error_message=err_msg)
external = net_data.get(enet_apidef.EXTERNAL)
is_external_net = validators.is_attr_set(external) and external
if is_external_net:
err_msg = _("External network cannot be created with dvs based "
"port groups")
raise n_exc.InvalidInput(error_message=err_msg)
err_msg = None err_msg = None
if not network_type_set: if not network_type_set:
err_msg = _("Network provider information must be " err_msg = _("Network provider information must be "
@ -295,7 +306,11 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
network_type == c_utils.NetworkTypes.PORTGROUP): network_type == c_utils.NetworkTypes.PORTGROUP):
if segmentation_id_set: if segmentation_id_set:
err_msg = (_("Segmentation ID cannot be specified with " err_msg = (_("Segmentation ID cannot be specified with "
"%s network type"), network_type) "%s network type") % network_type)
if (network_type == c_utils.NetworkTypes.PORTGROUP and
not physical_network_set):
err_msg = (_("Physical network must be specified with "
"%s network type") % network_type)
elif network_type == c_utils.NetworkTypes.VLAN: elif network_type == c_utils.NetworkTypes.VLAN:
if not segmentation_id_set: if not segmentation_id_set:
err_msg = _("Segmentation ID must be specified with " err_msg = _("Segmentation ID must be specified with "
@ -356,6 +371,10 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
self._extend_get_network_dict_provider(context, net_result) self._extend_get_network_dict_provider(context, net_result)
return db_utils.resource_fields(net_result, fields) return db_utils.resource_fields(net_result, fields)
def _dvs_get_network_type(self, context, id, fields=None):
net = self._dvs_get_network(context, id, fields=fields)
return net[pnet.NETWORK_TYPE]
def get_network(self, context, id, fields=None): def get_network(self, context, id, fields=None):
return self._dvs_get_network(context, id, fields=None) return self._dvs_get_network(context, id, fields=None)
@ -411,14 +430,15 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
# ATTR_NOT_SPECIFIED is for the case where a port is created on a # ATTR_NOT_SPECIFIED is for the case where a port is created on a
# shared network that is not owned by the tenant. # shared network that is not owned by the tenant.
port_data = port['port'] port_data = port['port']
network_type = self._dvs_get_network(context, port['port'][ network_type = self._dvs_get_network_type(context, port['port'][
'network_id'])['provider:network_type'] 'network_id'])
with db_api.CONTEXT_WRITER.using(context): with db_api.CONTEXT_WRITER.using(context):
# First we allocate port in neutron database # First we allocate port in neutron database
neutron_db = super(NsxDvsV2, self).create_port(context, port) neutron_db = super(NsxDvsV2, self).create_port(context, port)
self._extension_manager.process_create_port( self._extension_manager.process_create_port(
context, port_data, neutron_db) context, port_data, neutron_db)
if network_type and network_type == 'vlan': if network_type and network_type == 'vlan':
# Not allowed to enable port security on vlan DVS ports
port_data[psec.PORTSECURITY] = False port_data[psec.PORTSECURITY] = False
else: else:
port_security = self._get_network_security_binding( port_security = self._get_network_security_binding(
@ -496,13 +516,19 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
ret_port.update(port['port']) ret_port.update(port['port'])
# populate port_security setting, ignoring vlan network ports. # populate port_security setting, ignoring vlan network ports.
network_type = self._dvs_get_network(context, network_type = self._dvs_get_network_type(context,
ret_port['network_id'])[ ret_port['network_id'])
'provider:network_type']
if (psec.PORTSECURITY not in port['port'] and network_type != if (psec.PORTSECURITY not in port['port'] and network_type !=
'vlan'): 'vlan'):
ret_port[psec.PORTSECURITY] = self._get_port_security_binding( ret_port[psec.PORTSECURITY] = self._get_port_security_binding(
context, id) context, id)
elif (network_type == 'vlan' and
psec.PORTSECURITY in port['port'] and
port['port'][psec.PORTSECURITY]):
# Not allowed to enable port security on vlan DVS ports
err_msg = _("Cannot enable port security on port %s") % id
raise n_exc.InvalidInput(error_message=err_msg)
# validate port security and allowed address pairs # validate port security and allowed address pairs
if not ret_port[psec.PORTSECURITY]: if not ret_port[psec.PORTSECURITY]:
# has address pairs in request # has address pairs in request

View File

@ -237,6 +237,42 @@ class NeutronSimpleDvsTest(NeutronSimpleDvsTestCase):
vlan_tag=7) vlan_tag=7)
self.assertTrue(fake_pg_info.call_count) self.assertTrue(fake_pg_info.call_count)
def test_create_dvs_vlan_network_no_physical_network(self):
params = {'provider:network_type': 'vlan',
'provider:segmentation_id': 10,
'admin_state_up': True,
'name': 'fake-name',
'tenant_id': 'fake_tenant',
'shared': False,
'port_security_enabled': False}
params['arg_list'] = tuple(params.keys())
ctx = context.get_admin_context()
with mock.patch.object(self._plugin._dvs, 'add_port_group'),\
mock.patch.object(dvs.DvsManager, 'add_port_group'),\
mock.patch.object(dvs.DvsManager, 'get_dvs_moref_by_name',
return_value=mock.MagicMock()):
network = self._plugin.create_network(ctx, {'network': params})
# Should work and take the default dvs
self.assertIn('id', network)
def test_create_dvs_pg_network_no_physical_network(self):
params = {'provider:network_type': 'portgroup',
'provider:segmentation_id': 10,
'admin_state_up': True,
'name': 'fake-name',
'tenant_id': 'fake_tenant',
'shared': False,
'port_security_enabled': False}
params['arg_list'] = tuple(params.keys())
ctx = context.get_admin_context()
with mock.patch.object(self._plugin._dvs, 'add_port_group'),\
mock.patch.object(dvs.DvsManager, 'add_port_group'),\
mock.patch.object(dvs.DvsManager, 'get_dvs_moref_by_name',
return_value=mock.MagicMock()):
self.assertRaises(exp.InvalidInput,
self._plugin.create_network,
ctx, {'network': params})
def test_create_and_delete_dvs_port(self): def test_create_and_delete_dvs_port(self):
params = {'provider:network_type': 'vlan', params = {'provider:network_type': 'vlan',
'provider:physical_network': 'dvs', 'provider:physical_network': 'dvs',
@ -294,10 +330,7 @@ class NeutronSimpleDvsTest(NeutronSimpleDvsTestCase):
req = self.new_update_request('ports', req = self.new_update_request('ports',
data, port['port']['id']) data, port['port']['id'])
res = self.deserialize('json', req.get_response(self.api)) res = self.deserialize('json', req.get_response(self.api))
port_security = res['port']['port_security_enabled'] self.assertIn('NeutronError', res)
security_groups = res['port']['security_groups']
self.assertEqual(port_security, False)
self.assertEqual(security_groups, [])
def test_create_router_only_dvs_backend(self): def test_create_router_only_dvs_backend(self):
data = {'router': {'tenant_id': 'whatever'}} data = {'router': {'tenant_id': 'whatever'}}
@ -403,3 +436,25 @@ class NeutronSimpleDvsTest(NeutronSimpleDvsTestCase):
self._plugin.create_network(ctx, {'network': params}) self._plugin.create_network(ctx, {'network': params})
self.assertRaises(exp.InvalidInput, self._plugin.create_network, self.assertRaises(exp.InvalidInput, self._plugin.create_network,
ctx, {'network': params}) ctx, {'network': params})
def test_create_external_network_fail(self):
params = {'provider:network_type': 'vlan',
'admin_state_up': True,
'name': 'test_net',
'tenant_id': 'fake_tenant',
'router:external': True,
'shared': False,
'provider:physical_network': 'fake-moid',
'provider:segmentation_id': 7,
'port_security_enabled': False}
with mock.patch.object(self._plugin._dvs,
'add_port_group'),\
mock.patch.object(dvs.DvsManager,
'add_port_group'),\
mock.patch.object(dvs.DvsManager,
'get_dvs_moref_by_name',
return_value=mock.MagicMock()):
ctx = context.get_admin_context()
self.assertRaises(exp.InvalidInput, self._plugin.create_network,
ctx, {'network': params})