From 630a78546db3f698b9c6b37751a020c14a83943f Mon Sep 17 00:00:00 2001 From: Adit Sarfaty Date: Thu, 27 Apr 2017 12:49:44 +0300 Subject: [PATCH] NSX-V| add IPv6 link-local address to spoofguard Change-Id: I58d6e13bcd600720e7a053a13ec3c37b819e17af --- vmware_nsx/plugins/nsx_v/plugin.py | 6 ++ vmware_nsx/tests/unit/nsx_v/test_plugin.py | 66 +++++++++++++++++++--- 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/vmware_nsx/plugins/nsx_v/plugin.py b/vmware_nsx/plugins/nsx_v/plugin.py index 0d866f06d5..79ce7f8a37 100644 --- a/vmware_nsx/plugins/nsx_v/plugin.py +++ b/vmware_nsx/plugins/nsx_v/plugin.py @@ -29,6 +29,7 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils from oslo_utils import excutils +from oslo_utils import netutils from oslo_utils import uuidutils from sqlalchemy.orm import exc as sa_exc @@ -4087,6 +4088,11 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, # add in the address pair approved_addrs.extend( addr['ip_address'] for addr in port[addr_pair.ADDRESS_PAIRS]) + # add the IPv6 link-local address if there is an IPv6 address + if any([netaddr.valid_ipv6(address) for address in approved_addrs]): + lla = str(netutils.get_ipv6_addr_by_EUI64( + constants.IPv6_LLA_PREFIX, mac_addr)) + approved_addrs.append(lla) self.nsx_v.vcns.approve_assigned_addresses( sg_policy_id, vnic_id, mac_addr, approved_addrs) self.nsx_v.vcns.publish_assigned_addresses(sg_policy_id, vnic_id) diff --git a/vmware_nsx/tests/unit/nsx_v/test_plugin.py b/vmware_nsx/tests/unit/nsx_v/test_plugin.py index 97612bb9f4..6949a61d08 100644 --- a/vmware_nsx/tests/unit/nsx_v/test_plugin.py +++ b/vmware_nsx/tests/unit/nsx_v/test_plugin.py @@ -972,37 +972,85 @@ class TestPortsV2(NsxVPluginV2TestCase, return res @mock.patch.object(edge_utils.EdgeManager, 'delete_dhcp_binding') - def test_update_port_index(self, delete_dhcp_binding): + def _test_update_port_index_and_spoofguard( + self, ip_version, subnet_cidr, port_ip, port_mac, ipv6_lla, + delete_dhcp_binding): q_context = context.Context('', 'tenant_1') device_id = _uuid() - with self.subnet() as subnet: + with self.subnet(ip_version=ip_version, + enable_dhcp=(False if ip_version == 6 else True), + cidr=subnet_cidr, + gateway_ip=None) as subnet: + fixed_ip_data = [{'ip_address': port_ip, + 'subnet_id': subnet['subnet']['id']}] with self.port(subnet=subnet, device_id=device_id, + mac_address=port_mac, + fixed_ips=fixed_ip_data, device_owner='compute:None') as port: self.assertIsNone(port['port']['vnic_index']) + self.fc2.approve_assigned_addresses = ( + mock.Mock().approve_assigned_addresses) + self.fc2.publish_assigned_addresses = ( + mock.Mock().publish_assigned_addresses) + self.fc2.inactivate_vnic_assigned_addresses = ( + mock.Mock().inactivate_vnic_assigned_addresses) vnic_index = 3 res = self._update_port_index( port['port']['id'], device_id, vnic_index) self.assertEqual(vnic_index, res['port']['vnic_index']) + policy_id = nsxv_db.get_spoofguard_policy_id( + q_context.session, port['port']['network_id']) + vnic_id = '%s.%03d' % (device_id, vnic_index) + + # Verify that the spoofguard policy assigned and published + expected_ips = [port_ip] + if ipv6_lla: + expected_ips.append(ipv6_lla) + (self.fc2.approve_assigned_addresses. + assert_called_once_with(policy_id, vnic_id, port_mac, + expected_ips)) + (self.fc2.publish_assigned_addresses. + assert_called_once_with(policy_id, vnic_id)) + # Updating the vnic_index to None implies the vnic does # no longer obtain the addresses associated with this port, # we need to inactivate previous addresses configurations for # this vnic in the context of this network spoofguard policy. - self.fc2.inactivate_vnic_assigned_addresses = ( - mock.Mock().inactivate_vnic_assigned_addresses) - - policy_id = nsxv_db.get_spoofguard_policy_id( - q_context.session, port['port']['network_id']) - res = self._update_port_index(port['port']['id'], '', None) - vnic_id = '%s.%03d' % (device_id, vnic_index) (self.fc2.inactivate_vnic_assigned_addresses. assert_called_once_with(policy_id, vnic_id)) self.assertTrue(delete_dhcp_binding.called) + def test_update_port_index(self): + ip_version = 4 + subnet_cidr = '10.0.0.0/24' + port_ip = '10.0.0.8' + port_mac = '00:00:00:00:00:02' + ipv6_lla = None + self._test_update_port_index_and_spoofguard( + ip_version, + subnet_cidr, + port_ip, + port_mac, + ipv6_lla) + + def test_update_port_index_ipv6(self): + ip_version = 6 + subnet_cidr = 'ae80::/64' + port_mac = '00:00:00:00:00:02' + ipv6_lla = 'fe80::200:ff:fe00:2' + port_ip = 'ae80::2' + self._test_update_port_index_and_spoofguard( + ip_version, + subnet_cidr, + port_ip, + port_mac, + ipv6_lla) + def test_update_port_with_compute_device_owner(self): """ Test that DHCP binding is created when ports 'device_owner'