diff --git a/vmware_nsx/plugins/common_v3/plugin.py b/vmware_nsx/plugins/common_v3/plugin.py index 97d5b9fd47..14000c05b7 100644 --- a/vmware_nsx/plugins/common_v3/plugin.py +++ b/vmware_nsx/plugins/common_v3/plugin.py @@ -306,15 +306,29 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, context.elevated(), router_db.id, gw_network_id, interface_subnet['id'], subnet=interface_subnet) - def _validate_address_pairs(self, address_pairs): + def _validate_address_pairs(self, address_pairs, fixed_ips=None): + port_ips = [] + if fixed_ips: + # Make sure there are no duplications + for fixed_ip in fixed_ips: + port_ips.append(fixed_ip['ip_address']) + for pair in address_pairs: ip = pair.get('ip_address') # Validate ipv4 cidrs (No limitation on ipv6): if ':' not in ip: if len(ip.split('/')) > 1 and ip.split('/')[1] != '32': - LOG.error("cidr %s is not supported in allowed address " + LOG.error("Cidr %s is not supported in allowed address " "pairs", ip) raise nsx_exc.InvalidIPAddress(ip_address=ip) + if ip in port_ips: + err_msg = (_("Port cannot have duplicate values %s as part of " + "port manual bindings") % ip) + raise n_exc.InvalidInput(error_message=err_msg) + if ip in ['127.0.0.0', '0.0.0.0', '::']: + LOG.error("IP %s is not supported in allowed address " + "pairs", ip) + raise nsx_exc.InvalidIPAddress(ip_address=ip) def _validate_number_of_address_pairs(self, port): address_pairs = port.get(addr_apidef.ADDRESS_PAIRS) @@ -344,7 +358,8 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, if not port_security: raise addr_exc.AddressPairAndPortSecurityRequired() else: - self._validate_address_pairs(address_pairs) + self._validate_address_pairs( + address_pairs, fixed_ips=port_data.get('fixed_ips')) self._validate_number_of_address_pairs(port_data) self._process_create_allowed_address_pairs(context, port_data, address_pairs) @@ -426,7 +441,9 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, if delete_addr_pairs or has_addr_pairs: self._validate_address_pairs( - updated_port[addr_apidef.ADDRESS_PAIRS]) + updated_port[addr_apidef.ADDRESS_PAIRS], + fixed_ips=(updated_port.get('fixed_ips') or + port_data.get('fixed_ips'))) # delete address pairs and read them in self._delete_allowed_address_pairs(context, id) self._process_create_allowed_address_pairs( diff --git a/vmware_nsx/tests/unit/extensions/test_addresspairs.py b/vmware_nsx/tests/unit/extensions/test_addresspairs.py index 1b64d9d5ab..ee7c6663c1 100644 --- a/vmware_nsx/tests/unit/extensions/test_addresspairs.py +++ b/vmware_nsx/tests/unit/extensions/test_addresspairs.py @@ -19,6 +19,7 @@ from oslo_config import cfg from neutron.tests.unit.db import test_allowedaddresspairs_db as ext_pairs +from vmware_nsx.tests.unit.nsx_p import test_plugin as test_p_plugin from vmware_nsx.tests.unit.nsx_v import test_plugin as test_nsx_v_plugin from vmware_nsx.tests.unit.nsx_v3 import test_constants as v3_constants from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_v3_plugin @@ -39,6 +40,56 @@ class TestAllowedAddressPairsNSXv2(test_v3_plugin.NsxV3PluginTestCaseMixin, def test_create_port_security_false_allowed_address_pairs(self): self.skipTest('TBD') + def test_create_overlap_with_fixed_ip(self): + self.skipTest('Not supported') + + +class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin, + ext_pairs.TestAllowedAddressPairs): + + def setUp(self, plugin=test_p_plugin.PLUGIN_NAME, + ext_mgr=None, + service_plugins=None): + super(TestAllowedAddressPairsNSXp, self).setUp( + plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins) + + def test_create_bad_address_pairs_with_cidr(self): + address_pairs = [{'mac_address': '00:00:00:00:00:01', + 'ip_address': '10.0.0.1/24'}] + self._create_port_with_address_pairs(address_pairs, 400) + + def test_create_port_allowed_address_pairs_v6(self): + with self.network() as net: + address_pairs = [{'ip_address': '1001::12'}] + res = self._create_port(self.fmt, net['network']['id'], + arg_list=(addr_apidef.ADDRESS_PAIRS,), + allowed_address_pairs=address_pairs) + port = self.deserialize(self.fmt, res) + address_pairs[0]['mac_address'] = port['port']['mac_address'] + self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], + address_pairs) + self._delete('ports', port['port']['id']) + + def test_update_add_bad_address_pairs_with_cidr(self): + with self.network() as net: + res = self._create_port(self.fmt, net['network']['id']) + port = self.deserialize(self.fmt, res) + address_pairs = [{'mac_address': '00:00:00:00:00:01', + 'ip_address': '10.0.0.1/24'}] + update_port = {'port': {addr_apidef.ADDRESS_PAIRS: + address_pairs}} + req = self.new_update_request('ports', update_port, + port['port']['id']) + res = req.get_response(self.api) + self.assertEqual(res.status_int, 400) + self._delete('ports', port['port']['id']) + + def test_create_port_security_false_allowed_address_pairs(self): + self.skipTest('TBD') + + def test_create_overlap_with_fixed_ip(self): + self.skipTest('Not supported') + class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin, ext_pairs.TestAllowedAddressPairs): @@ -83,6 +134,9 @@ class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin, def test_create_port_security_false_allowed_address_pairs(self): self.skipTest('TBD') + def test_create_overlap_with_fixed_ip(self): + self.skipTest('Not supported') + class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase, ext_pairs.TestAllowedAddressPairs): diff --git a/vmware_nsx/tests/unit/nsx_v3/test_plugin.py b/vmware_nsx/tests/unit/nsx_v3/test_plugin.py index 9b79e6d035..af5e587309 100644 --- a/vmware_nsx/tests/unit/nsx_v3/test_plugin.py +++ b/vmware_nsx/tests/unit/nsx_v3/test_plugin.py @@ -54,6 +54,7 @@ from oslo_utils import uuidutils from webob import exc from vmware_nsx.api_client import exception as api_exc +from vmware_nsx.common import exceptions as nsx_exc from vmware_nsx.common import utils from vmware_nsx.db import db as nsx_db from vmware_nsx.plugins.nsx_v3 import plugin as nsx_plugin @@ -1129,6 +1130,46 @@ class TestPortsV2(common_v3.NsxV3SubnetMixin, self.assertRaises(n_exc.InvalidInput, self.plugin.create_port, self.ctx, data) + def test_fail_create_allowed_address_pairs_dup(self): + with self.network() as network, self.subnet( + network=network, cidr="1.1.1.0/24", + enable_dhcp=True) as s1: + data = { + 'port': { + 'network_id': network['network']['id'], + 'tenant_id': self._tenant_id, + 'name': 'pair_port', + 'admin_state_up': True, + 'device_id': 'fake_device', + 'device_owner': 'fake_owner', + 'fixed_ips': [{'subnet_id': s1['subnet']['id'], + 'ip_address': '1.1.1.30'}] + } + } + data['port']['allowed_address_pairs'] = [ + {'ip_address': '1.1.1.30'}] + self.assertRaises(n_exc.InvalidInput, + self.plugin.create_port, self.ctx, data) + + def test_fail_create_allowed_address_pairs_illegal_ip(self): + with self.network() as network, self.subnet( + network=network, enable_dhcp=True) as s1: + data = { + 'port': { + 'network_id': network['network']['id'], + 'tenant_id': self._tenant_id, + 'name': 'pair_port', + 'admin_state_up': True, + 'device_id': 'fake_device', + 'device_owner': 'fake_owner', + 'fixed_ips': [{'subnet_id': s1['subnet']['id']}] + } + } + data['port']['allowed_address_pairs'] = [ + {'ip_address': '127.0.0.0'}] + self.assertRaises(nsx_exc.InvalidIPAddress, + self.plugin.create_port, self.ctx, data) + def test_fail_update_lb_port_with_fixed_ip(self): with self.network() as network: data = {'port': {