diff --git a/vmware_nsx/plugins/dvs/plugin.py b/vmware_nsx/plugins/dvs/plugin.py index 6ceb2b1958..deb0f73d42 100644 --- a/vmware_nsx/plugins/dvs/plugin.py +++ b/vmware_nsx/plugins/dvs/plugin.py @@ -275,7 +275,10 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin, segmentation_id = net_data.get(pnet.SEGMENTATION_ID) segmentation_id_set = validators.is_attr_set(segmentation_id) physical_network = net_data.get(pnet.PHYSICAL_NETWORK) + physical_network_set = validators.is_attr_set(physical_network) if network_type == 'vlan': + if not physical_network_set: + physical_network = dvs_utils.dvs_name_get() bindings = nsx_db.get_network_bindings_by_vlanid_and_physical_net( context.session, segmentation_id, physical_network) if bindings: @@ -286,6 +289,14 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin, err_msg = _("Only an admin can create a DVS provider " "network") raise n_exc.InvalidInput(error_message=err_msg) + + external = net_data.get(enet_apidef.EXTERNAL) + is_external_net = validators.is_attr_set(external) and external + if is_external_net: + err_msg = _("External network cannot be created with dvs based " + "port groups") + raise n_exc.InvalidInput(error_message=err_msg) + err_msg = None if not network_type_set: err_msg = _("Network provider information must be " @@ -295,7 +306,11 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin, network_type == c_utils.NetworkTypes.PORTGROUP): if segmentation_id_set: err_msg = (_("Segmentation ID cannot be specified with " - "%s network type"), network_type) + "%s network type") % network_type) + if (network_type == c_utils.NetworkTypes.PORTGROUP and + not physical_network_set): + err_msg = (_("Physical network must be specified with " + "%s network type") % network_type) elif network_type == c_utils.NetworkTypes.VLAN: if not segmentation_id_set: err_msg = _("Segmentation ID must be specified with " @@ -356,6 +371,10 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin, self._extend_get_network_dict_provider(context, net_result) return db_utils.resource_fields(net_result, fields) + def _dvs_get_network_type(self, context, id, fields=None): + net = self._dvs_get_network(context, id, fields=fields) + return net[pnet.NETWORK_TYPE] + def get_network(self, context, id, fields=None): return self._dvs_get_network(context, id, fields=None) @@ -411,14 +430,15 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin, # ATTR_NOT_SPECIFIED is for the case where a port is created on a # shared network that is not owned by the tenant. port_data = port['port'] - network_type = self._dvs_get_network(context, port['port'][ - 'network_id'])['provider:network_type'] + network_type = self._dvs_get_network_type(context, port['port'][ + 'network_id']) with db_api.CONTEXT_WRITER.using(context): # First we allocate port in neutron database neutron_db = super(NsxDvsV2, self).create_port(context, port) self._extension_manager.process_create_port( context, port_data, neutron_db) if network_type and network_type == 'vlan': + # Not allowed to enable port security on vlan DVS ports port_data[psec.PORTSECURITY] = False else: port_security = self._get_network_security_binding( @@ -496,13 +516,19 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin, ret_port.update(port['port']) # populate port_security setting, ignoring vlan network ports. - network_type = self._dvs_get_network(context, - ret_port['network_id'])[ - 'provider:network_type'] + network_type = self._dvs_get_network_type(context, + ret_port['network_id']) if (psec.PORTSECURITY not in port['port'] and network_type != 'vlan'): ret_port[psec.PORTSECURITY] = self._get_port_security_binding( context, id) + elif (network_type == 'vlan' and + psec.PORTSECURITY in port['port'] and + port['port'][psec.PORTSECURITY]): + # Not allowed to enable port security on vlan DVS ports + err_msg = _("Cannot enable port security on port %s") % id + raise n_exc.InvalidInput(error_message=err_msg) + # validate port security and allowed address pairs if not ret_port[psec.PORTSECURITY]: # has address pairs in request diff --git a/vmware_nsx/tests/unit/dvs/test_plugin.py b/vmware_nsx/tests/unit/dvs/test_plugin.py index 1d1b9a0049..e861f5fbee 100644 --- a/vmware_nsx/tests/unit/dvs/test_plugin.py +++ b/vmware_nsx/tests/unit/dvs/test_plugin.py @@ -237,6 +237,42 @@ class NeutronSimpleDvsTest(NeutronSimpleDvsTestCase): vlan_tag=7) self.assertTrue(fake_pg_info.call_count) + def test_create_dvs_vlan_network_no_physical_network(self): + params = {'provider:network_type': 'vlan', + 'provider:segmentation_id': 10, + 'admin_state_up': True, + 'name': 'fake-name', + 'tenant_id': 'fake_tenant', + 'shared': False, + 'port_security_enabled': False} + params['arg_list'] = tuple(params.keys()) + ctx = context.get_admin_context() + with mock.patch.object(self._plugin._dvs, 'add_port_group'),\ + mock.patch.object(dvs.DvsManager, 'add_port_group'),\ + mock.patch.object(dvs.DvsManager, 'get_dvs_moref_by_name', + return_value=mock.MagicMock()): + network = self._plugin.create_network(ctx, {'network': params}) + # Should work and take the default dvs + self.assertIn('id', network) + + def test_create_dvs_pg_network_no_physical_network(self): + params = {'provider:network_type': 'portgroup', + 'provider:segmentation_id': 10, + 'admin_state_up': True, + 'name': 'fake-name', + 'tenant_id': 'fake_tenant', + 'shared': False, + 'port_security_enabled': False} + params['arg_list'] = tuple(params.keys()) + ctx = context.get_admin_context() + with mock.patch.object(self._plugin._dvs, 'add_port_group'),\ + mock.patch.object(dvs.DvsManager, 'add_port_group'),\ + mock.patch.object(dvs.DvsManager, 'get_dvs_moref_by_name', + return_value=mock.MagicMock()): + self.assertRaises(exp.InvalidInput, + self._plugin.create_network, + ctx, {'network': params}) + def test_create_and_delete_dvs_port(self): params = {'provider:network_type': 'vlan', 'provider:physical_network': 'dvs', @@ -294,10 +330,7 @@ class NeutronSimpleDvsTest(NeutronSimpleDvsTestCase): req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize('json', req.get_response(self.api)) - port_security = res['port']['port_security_enabled'] - security_groups = res['port']['security_groups'] - self.assertEqual(port_security, False) - self.assertEqual(security_groups, []) + self.assertIn('NeutronError', res) def test_create_router_only_dvs_backend(self): data = {'router': {'tenant_id': 'whatever'}} @@ -403,3 +436,25 @@ class NeutronSimpleDvsTest(NeutronSimpleDvsTestCase): self._plugin.create_network(ctx, {'network': params}) self.assertRaises(exp.InvalidInput, self._plugin.create_network, ctx, {'network': params}) + + def test_create_external_network_fail(self): + params = {'provider:network_type': 'vlan', + 'admin_state_up': True, + 'name': 'test_net', + 'tenant_id': 'fake_tenant', + 'router:external': True, + 'shared': False, + 'provider:physical_network': 'fake-moid', + 'provider:segmentation_id': 7, + 'port_security_enabled': False} + + with mock.patch.object(self._plugin._dvs, + 'add_port_group'),\ + mock.patch.object(dvs.DvsManager, + 'add_port_group'),\ + mock.patch.object(dvs.DvsManager, + 'get_dvs_moref_by_name', + return_value=mock.MagicMock()): + ctx = context.get_admin_context() + self.assertRaises(exp.InvalidInput, self._plugin.create_network, + ctx, {'network': params})