Merge "DVS: Add plugin validations"

This commit is contained in:
Zuul 2019-05-28 10:54:54 +00:00 committed by Gerrit Code Review
commit e702f7116e
2 changed files with 91 additions and 10 deletions

View File

@ -275,7 +275,10 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
segmentation_id = net_data.get(pnet.SEGMENTATION_ID)
segmentation_id_set = validators.is_attr_set(segmentation_id)
physical_network = net_data.get(pnet.PHYSICAL_NETWORK)
physical_network_set = validators.is_attr_set(physical_network)
if network_type == 'vlan':
if not physical_network_set:
physical_network = dvs_utils.dvs_name_get()
bindings = nsx_db.get_network_bindings_by_vlanid_and_physical_net(
context.session, segmentation_id, physical_network)
if bindings:
@ -286,6 +289,14 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
err_msg = _("Only an admin can create a DVS provider "
"network")
raise n_exc.InvalidInput(error_message=err_msg)
external = net_data.get(enet_apidef.EXTERNAL)
is_external_net = validators.is_attr_set(external) and external
if is_external_net:
err_msg = _("External network cannot be created with dvs based "
"port groups")
raise n_exc.InvalidInput(error_message=err_msg)
err_msg = None
if not network_type_set:
err_msg = _("Network provider information must be "
@ -295,7 +306,11 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
network_type == c_utils.NetworkTypes.PORTGROUP):
if segmentation_id_set:
err_msg = (_("Segmentation ID cannot be specified with "
"%s network type"), network_type)
"%s network type") % network_type)
if (network_type == c_utils.NetworkTypes.PORTGROUP and
not physical_network_set):
err_msg = (_("Physical network must be specified with "
"%s network type") % network_type)
elif network_type == c_utils.NetworkTypes.VLAN:
if not segmentation_id_set:
err_msg = _("Segmentation ID must be specified with "
@ -356,6 +371,10 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
self._extend_get_network_dict_provider(context, net_result)
return db_utils.resource_fields(net_result, fields)
def _dvs_get_network_type(self, context, id, fields=None):
net = self._dvs_get_network(context, id, fields=fields)
return net[pnet.NETWORK_TYPE]
def get_network(self, context, id, fields=None):
return self._dvs_get_network(context, id, fields=None)
@ -411,14 +430,15 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
# ATTR_NOT_SPECIFIED is for the case where a port is created on a
# shared network that is not owned by the tenant.
port_data = port['port']
network_type = self._dvs_get_network(context, port['port'][
'network_id'])['provider:network_type']
network_type = self._dvs_get_network_type(context, port['port'][
'network_id'])
with db_api.CONTEXT_WRITER.using(context):
# First we allocate port in neutron database
neutron_db = super(NsxDvsV2, self).create_port(context, port)
self._extension_manager.process_create_port(
context, port_data, neutron_db)
if network_type and network_type == 'vlan':
# Not allowed to enable port security on vlan DVS ports
port_data[psec.PORTSECURITY] = False
else:
port_security = self._get_network_security_binding(
@ -496,13 +516,19 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
ret_port.update(port['port'])
# populate port_security setting, ignoring vlan network ports.
network_type = self._dvs_get_network(context,
ret_port['network_id'])[
'provider:network_type']
network_type = self._dvs_get_network_type(context,
ret_port['network_id'])
if (psec.PORTSECURITY not in port['port'] and network_type !=
'vlan'):
ret_port[psec.PORTSECURITY] = self._get_port_security_binding(
context, id)
elif (network_type == 'vlan' and
psec.PORTSECURITY in port['port'] and
port['port'][psec.PORTSECURITY]):
# Not allowed to enable port security on vlan DVS ports
err_msg = _("Cannot enable port security on port %s") % id
raise n_exc.InvalidInput(error_message=err_msg)
# validate port security and allowed address pairs
if not ret_port[psec.PORTSECURITY]:
# has address pairs in request

View File

@ -237,6 +237,42 @@ class NeutronSimpleDvsTest(NeutronSimpleDvsTestCase):
vlan_tag=7)
self.assertTrue(fake_pg_info.call_count)
def test_create_dvs_vlan_network_no_physical_network(self):
params = {'provider:network_type': 'vlan',
'provider:segmentation_id': 10,
'admin_state_up': True,
'name': 'fake-name',
'tenant_id': 'fake_tenant',
'shared': False,
'port_security_enabled': False}
params['arg_list'] = tuple(params.keys())
ctx = context.get_admin_context()
with mock.patch.object(self._plugin._dvs, 'add_port_group'),\
mock.patch.object(dvs.DvsManager, 'add_port_group'),\
mock.patch.object(dvs.DvsManager, 'get_dvs_moref_by_name',
return_value=mock.MagicMock()):
network = self._plugin.create_network(ctx, {'network': params})
# Should work and take the default dvs
self.assertIn('id', network)
def test_create_dvs_pg_network_no_physical_network(self):
params = {'provider:network_type': 'portgroup',
'provider:segmentation_id': 10,
'admin_state_up': True,
'name': 'fake-name',
'tenant_id': 'fake_tenant',
'shared': False,
'port_security_enabled': False}
params['arg_list'] = tuple(params.keys())
ctx = context.get_admin_context()
with mock.patch.object(self._plugin._dvs, 'add_port_group'),\
mock.patch.object(dvs.DvsManager, 'add_port_group'),\
mock.patch.object(dvs.DvsManager, 'get_dvs_moref_by_name',
return_value=mock.MagicMock()):
self.assertRaises(exp.InvalidInput,
self._plugin.create_network,
ctx, {'network': params})
def test_create_and_delete_dvs_port(self):
params = {'provider:network_type': 'vlan',
'provider:physical_network': 'dvs',
@ -294,10 +330,7 @@ class NeutronSimpleDvsTest(NeutronSimpleDvsTestCase):
req = self.new_update_request('ports',
data, port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
port_security = res['port']['port_security_enabled']
security_groups = res['port']['security_groups']
self.assertEqual(port_security, False)
self.assertEqual(security_groups, [])
self.assertIn('NeutronError', res)
def test_create_router_only_dvs_backend(self):
data = {'router': {'tenant_id': 'whatever'}}
@ -403,3 +436,25 @@ class NeutronSimpleDvsTest(NeutronSimpleDvsTestCase):
self._plugin.create_network(ctx, {'network': params})
self.assertRaises(exp.InvalidInput, self._plugin.create_network,
ctx, {'network': params})
def test_create_external_network_fail(self):
params = {'provider:network_type': 'vlan',
'admin_state_up': True,
'name': 'test_net',
'tenant_id': 'fake_tenant',
'router:external': True,
'shared': False,
'provider:physical_network': 'fake-moid',
'provider:segmentation_id': 7,
'port_security_enabled': False}
with mock.patch.object(self._plugin._dvs,
'add_port_group'),\
mock.patch.object(dvs.DvsManager,
'add_port_group'),\
mock.patch.object(dvs.DvsManager,
'get_dvs_moref_by_name',
return_value=mock.MagicMock()):
ctx = context.get_admin_context()
self.assertRaises(exp.InvalidInput, self._plugin.create_network,
ctx, {'network': params})