diff --git a/vmware_nsx/plugins/dvs/plugin.py b/vmware_nsx/plugins/dvs/plugin.py index 4971c0ea3a..b1a2ddaaf7 100644 --- a/vmware_nsx/plugins/dvs/plugin.py +++ b/vmware_nsx/plugins/dvs/plugin.py @@ -15,8 +15,10 @@ import uuid +from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef from neutron_lib.api.definitions import port as port_def from neutron_lib.api.definitions import port_security as psec +from neutron_lib.exceptions import allowedaddresspairs as addr_exc from neutron_lib.exceptions import port_security as psec_exc from oslo_log import log as logging from oslo_utils import excutils @@ -36,7 +38,6 @@ from neutron.db import portbindings_db from neutron.db import portsecurity_db from neutron.db import securitygroups_db from neutron.db import vlantransparent_db as vlan_ext_db -from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import multiprovidernet as mpnet from neutron.extensions import providernet from neutron.extensions import securitygroup as ext_sg @@ -369,16 +370,17 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin, port_data) # allowed address pair checks - if validators.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)): + if validators.is_attr_set(port_data.get( + addr_apidef.ADDRESS_PAIRS)): if not port_security: - raise addr_pair.AddressPairAndPortSecurityRequired() + raise addr_exc.AddressPairAndPortSecurityRequired() else: self._process_create_allowed_address_pairs( context, neutron_db, - port_data[addr_pair.ADDRESS_PAIRS]) + port_data[addr_apidef.ADDRESS_PAIRS]) else: # remove ATTR_NOT_SPECIFIED - port_data[addr_pair.ADDRESS_PAIRS] = [] + port_data[addr_apidef.ADDRESS_PAIRS] = [] self._process_portbindings_create_and_update(context, port['port'], @@ -422,19 +424,19 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin, if not ret_port[psec.PORTSECURITY]: # has address pairs in request if has_addr_pairs: - raise addr_pair.AddressPairAndPortSecurityRequired() + raise addr_exc.AddressPairAndPortSecurityRequired() elif not delete_addr_pairs: # check if address pairs are in db - ret_port[addr_pair.ADDRESS_PAIRS] = ( + ret_port[addr_apidef.ADDRESS_PAIRS] = ( self.get_allowed_address_pairs(context, id)) - if ret_port[addr_pair.ADDRESS_PAIRS]: - raise addr_pair.AddressPairAndPortSecurityRequired() + if ret_port[addr_apidef.ADDRESS_PAIRS]: + raise addr_exc.AddressPairAndPortSecurityRequired() if delete_addr_pairs or has_addr_pairs: # delete address pairs and read them in self._delete_allowed_address_pairs(context, id) self._process_create_allowed_address_pairs( - context, ret_port, ret_port[addr_pair.ADDRESS_PAIRS]) + context, ret_port, ret_port[addr_apidef.ADDRESS_PAIRS]) if psec.PORTSECURITY in port['port']: self._process_port_port_security_update( diff --git a/vmware_nsx/plugins/nsx_mh/plugin.py b/vmware_nsx/plugins/nsx_mh/plugin.py index 960b2fb570..0402d783a7 100644 --- a/vmware_nsx/plugins/nsx_mh/plugin.py +++ b/vmware_nsx/plugins/nsx_mh/plugin.py @@ -15,6 +15,7 @@ import uuid +from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef from neutron_lib.api.definitions import external_net as extnet_apidef from neutron_lib.api.definitions import port_security as psec from neutron_lib.api import faults @@ -22,6 +23,7 @@ from neutron_lib.api import validators from neutron_lib import constants from neutron_lib import context as q_context from neutron_lib import exceptions as n_exc +from neutron_lib.exceptions import allowedaddresspairs as addr_exc from neutron_lib.exceptions import port_security as psec_exc from oslo_concurrency import lockutils from oslo_config import cfg @@ -57,7 +59,6 @@ from neutron.db import portbindings_db from neutron.db import portsecurity_db from neutron.db import quota_db # noqa from neutron.db import securitygroups_db -from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import extraroute from neutron.extensions import l3 from neutron.extensions import multiprovidernet as mpnet @@ -433,7 +434,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin, nsx_sec_profile_ids, port_data.get(qos.QUEUE), port_data.get(mac_ext.MAC_LEARNING), - port_data.get(addr_pair.ADDRESS_PAIRS)) + port_data.get(addr_apidef.ADDRESS_PAIRS)) def _handle_create_port_exception(self, context, port_id, ls_uuid, lp_uuid): @@ -1128,16 +1129,17 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin, self._process_port_port_security_create( context, port_data, neutron_db) # allowed address pair checks - if validators.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)): + if validators.is_attr_set(port_data.get( + addr_apidef.ADDRESS_PAIRS)): if not port_security: - raise addr_pair.AddressPairAndPortSecurityRequired() + raise addr_exc.AddressPairAndPortSecurityRequired() else: self._process_create_allowed_address_pairs( context, neutron_db, - port_data[addr_pair.ADDRESS_PAIRS]) + port_data[addr_apidef.ADDRESS_PAIRS]) else: # remove ATTR_NOT_SPECIFIED - port_data[addr_pair.ADDRESS_PAIRS] = [] + port_data[addr_apidef.ADDRESS_PAIRS] = [] # security group extension checks # NOTE: check_update_has_security_groups works fine for @@ -1234,19 +1236,19 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin, if not ret_port[psec.PORTSECURITY]: # has address pairs in request if has_addr_pairs: - raise addr_pair.AddressPairAndPortSecurityRequired() + raise addr_exc.AddressPairAndPortSecurityRequired() elif not delete_addr_pairs: # check if address pairs are in db - ret_port[addr_pair.ADDRESS_PAIRS] = ( + ret_port[addr_apidef.ADDRESS_PAIRS] = ( self.get_allowed_address_pairs(context, id)) - if ret_port[addr_pair.ADDRESS_PAIRS]: - raise addr_pair.AddressPairAndPortSecurityRequired() + if ret_port[addr_apidef.ADDRESS_PAIRS]: + raise addr_exc.AddressPairAndPortSecurityRequired() if (delete_addr_pairs or has_addr_pairs): # delete address pairs and read them in self._delete_allowed_address_pairs(context, id) self._process_create_allowed_address_pairs( - context, ret_port, ret_port[addr_pair.ADDRESS_PAIRS]) + context, ret_port, ret_port[addr_apidef.ADDRESS_PAIRS]) # checks if security groups were updated adding/modifying # security groups, port security is set and port has ip if not (has_ip and ret_port[psec.PORTSECURITY]): @@ -1313,7 +1315,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin, nsx_sec_profile_ids, ret_port[qos.QUEUE], ret_port.get(mac_ext.MAC_LEARNING), - ret_port.get(addr_pair.ADDRESS_PAIRS)) + ret_port.get(addr_apidef.ADDRESS_PAIRS)) # Update the port status from nsx. If we fail here hide it # since the port was successfully updated but we were not diff --git a/vmware_nsx/plugins/nsx_v/plugin.py b/vmware_nsx/plugins/nsx_v/plugin.py index 4a644c9c28..fff5663554 100644 --- a/vmware_nsx/plugins/nsx_v/plugin.py +++ b/vmware_nsx/plugins/nsx_v/plugin.py @@ -17,6 +17,8 @@ from distutils import version import uuid import netaddr + +from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef from neutron_lib.api.definitions import availability_zone as az_def from neutron_lib.api.definitions import external_net as extnet_apidef from neutron_lib.api.definitions import extra_dhcp_opt as ext_edo @@ -34,6 +36,7 @@ from neutron_lib import constants from neutron_lib import context as n_context from neutron_lib.db import constants as db_const from neutron_lib import exceptions as n_exc +from neutron_lib.exceptions import allowedaddresspairs as addr_exc from neutron_lib.exceptions import port_security as psec_exc from neutron_lib.plugins import constants as plugin_const from neutron_lib.plugins import directory @@ -73,7 +76,6 @@ from neutron.db import portsecurity_db from neutron.db import quota_db # noqa from neutron.db import securitygroups_db from neutron.db import vlantransparent_db -from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import flavors from neutron.extensions import l3 from neutron.extensions import multiprovidernet as mpnet @@ -1657,7 +1659,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, return net_res def _validate_address_pairs(self, attrs, db_port): - for ap in attrs[addr_pair.ADDRESS_PAIRS]: + for ap in attrs[addr_apidef.ADDRESS_PAIRS]: # Check that the IP address is a subnet if len(ap['ip_address'].split('/')) > 1: msg = _('NSXv does not support CIDR as address pairs') @@ -1763,11 +1765,11 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, attrs = port[port_def.RESOURCE_NAME] if self._check_update_has_allowed_address_pairs(port): if not port_security: - raise addr_pair.AddressPairAndPortSecurityRequired() + raise addr_exc.AddressPairAndPortSecurityRequired() self._validate_address_pairs(attrs, neutron_db) else: # remove ATTR_NOT_SPECIFIED - attrs[addr_pair.ADDRESS_PAIRS] = [] + attrs[addr_apidef.ADDRESS_PAIRS] = [] # security group extension checks if has_ip and port_security: @@ -1784,10 +1786,10 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, port_data, ssgids) - neutron_db[addr_pair.ADDRESS_PAIRS] = ( + neutron_db[addr_apidef.ADDRESS_PAIRS] = ( self._process_create_allowed_address_pairs( context, neutron_db, - attrs.get(addr_pair.ADDRESS_PAIRS))) + attrs.get(addr_apidef.ADDRESS_PAIRS))) self._process_port_create_extra_dhcp_opts( context, port_data, dhcp_opts) @@ -1967,7 +1969,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, port_data = port['port'] dhcp_opts = port_data.get(ext_edo.EXTRADHCPOPTS) self._validate_extra_dhcp_options(dhcp_opts) - if addr_pair.ADDRESS_PAIRS in attrs: + if addr_apidef.ADDRESS_PAIRS in attrs: self._validate_address_pairs(attrs, original_port) self._validate_max_ips_per_port( port_data.get('fixed_ips', []), @@ -2090,7 +2092,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, context, port, original_port, ret_port) update_assigned_addresses = False - if addr_pair.ADDRESS_PAIRS in attrs: + if addr_apidef.ADDRESS_PAIRS in attrs: update_assigned_addresses = self.update_address_pairs_on_port( context, id, port, original_port, ret_port) @@ -4319,7 +4321,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, approved_addrs = [addr['ip_address'] for addr in port['fixed_ips']] # add in the address pair approved_addrs.extend( - addr['ip_address'] for addr in port[addr_pair.ADDRESS_PAIRS]) + addr['ip_address'] for addr in port[addr_apidef.ADDRESS_PAIRS]) # add the IPv6 link-local address if there is an IPv6 address if any([netaddr.valid_ipv6(address) for address in approved_addrs]): lla = str(netutils.get_ipv6_addr_by_EUI64( diff --git a/vmware_nsx/plugins/nsx_v3/plugin.py b/vmware_nsx/plugins/nsx_v3/plugin.py index 2585256af7..b19ceadacd 100644 --- a/vmware_nsx/plugins/nsx_v3/plugin.py +++ b/vmware_nsx/plugins/nsx_v3/plugin.py @@ -14,12 +14,14 @@ # under the License. import netaddr +from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef from neutron_lib.api.definitions import availability_zone as az_def from neutron_lib.api.definitions import external_net as extnet_apidef from neutron_lib.api.definitions import network as net_def from neutron_lib.api.definitions import port_security as psec from neutron_lib.api import faults from neutron_lib.api.validators import availability_zone as az_validator +from neutron_lib.exceptions import allowedaddresspairs as addr_exc from neutron_lib.exceptions import port_security as psec_exc from neutron_lib.services.qos import constants as qos_consts @@ -48,7 +50,6 @@ from neutron.db import models_v2 from neutron.db import portbindings_db from neutron.db import portsecurity_db from neutron.db import securitygroups_db -from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import l3 from neutron.extensions import providernet from neutron.extensions import securitygroup as ext_sg @@ -1614,7 +1615,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, address_bindings.append(nsx_resources.PacketAddressClassifier( fixed_ip['ip_address'], port['mac_address'], None)) - for pair in port.get(addr_pair.ADDRESS_PAIRS): + for pair in port.get(addr_apidef.ADDRESS_PAIRS): address_bindings.append(nsx_resources.PacketAddressClassifier( pair['ip_address'], pair['mac_address'], None)) @@ -1776,7 +1777,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, mac_learning_profile_set = False if psec_is_on: - address_pairs = port_data.get(addr_pair.ADDRESS_PAIRS) + address_pairs = port_data.get(addr_apidef.ADDRESS_PAIRS) if validators.is_attr_set(address_pairs) and address_pairs: mac_learning_profile_set = True profiles.append(self._get_port_security_profile_id()) @@ -1884,10 +1885,10 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, context, port_data, neutron_db) # allowed address pair checks - address_pairs = port_data.get(addr_pair.ADDRESS_PAIRS) + address_pairs = port_data.get(addr_apidef.ADDRESS_PAIRS) if validators.is_attr_set(address_pairs): if not port_security: - raise addr_pair.AddressPairAndPortSecurityRequired() + raise addr_exc.AddressPairAndPortSecurityRequired() else: self._validate_address_pairs(address_pairs) self._process_create_allowed_address_pairs( @@ -1895,7 +1896,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, address_pairs) else: # remove ATTR_NOT_SPECIFIED - port_data[addr_pair.ADDRESS_PAIRS] = [] + port_data[addr_apidef.ADDRESS_PAIRS] = [] if port_security and has_ip: self._ensure_default_security_group_on_port(context, port) @@ -2534,22 +2535,22 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, if not updated_port[psec.PORTSECURITY]: # has address pairs in request if has_addr_pairs: - raise addr_pair.AddressPairAndPortSecurityRequired() + raise addr_exc.AddressPairAndPortSecurityRequired() elif not delete_addr_pairs: # check if address pairs are in db - updated_port[addr_pair.ADDRESS_PAIRS] = ( + updated_port[addr_apidef.ADDRESS_PAIRS] = ( self.get_allowed_address_pairs(context, id)) - if updated_port[addr_pair.ADDRESS_PAIRS]: - raise addr_pair.AddressPairAndPortSecurityRequired() + if updated_port[addr_apidef.ADDRESS_PAIRS]: + raise addr_exc.AddressPairAndPortSecurityRequired() if delete_addr_pairs or has_addr_pairs: self._validate_address_pairs( - updated_port[addr_pair.ADDRESS_PAIRS]) + updated_port[addr_apidef.ADDRESS_PAIRS]) # delete address pairs and read them in self._delete_allowed_address_pairs(context, id) self._process_create_allowed_address_pairs( context, updated_port, - updated_port[addr_pair.ADDRESS_PAIRS]) + updated_port[addr_apidef.ADDRESS_PAIRS]) # No port security is allowed if the port belongs to an ENS TZ if (updated_port[psec.PORTSECURITY] and @@ -2709,7 +2710,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, psec_is_on = self._get_port_security_profile_id() in switch_profile_ids - address_pairs = updated_port.get(addr_pair.ADDRESS_PAIRS) + address_pairs = updated_port.get(addr_apidef.ADDRESS_PAIRS) mac_learning_profile_set = ( validators.is_attr_set(address_pairs) and address_pairs and psec_is_on) @@ -2867,9 +2868,9 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, # revert allowed address pairs if port_security: orig_pair = original_port.get( - addr_pair.ADDRESS_PAIRS) + addr_apidef.ADDRESS_PAIRS) updated_pair = updated_port.get( - addr_pair.ADDRESS_PAIRS) + addr_apidef.ADDRESS_PAIRS) if orig_pair != updated_pair: self._delete_allowed_address_pairs(context, id) if orig_pair: diff --git a/vmware_nsx/shell/admin/plugins/nsxv3/resources/ports.py b/vmware_nsx/shell/admin/plugins/nsxv3/resources/ports.py index e1fe0af71e..5b4b024278 100644 --- a/vmware_nsx/shell/admin/plugins/nsxv3/resources/ports.py +++ b/vmware_nsx/shell/admin/plugins/nsxv3/resources/ports.py @@ -37,7 +37,7 @@ from neutron.db import allowedaddresspairs_db as addr_pair_db from neutron.db import db_base_plugin_v2 from neutron.db import l3_db from neutron.db import portsecurity_db -from neutron.extensions import allowedaddresspairs +from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef from neutron_lib.callbacks import registry from neutron_lib import constants as const from neutron_lib import context as neutron_context @@ -180,7 +180,7 @@ def list_missing_ports(resource, event, trigger, **kwargs): # neutron spoofguard profile should be attached port_sec, has_ip = plugin._determine_port_security_and_has_ip( admin_cxt, port) - addr_pair = port.get(allowedaddresspairs.ADDRESS_PAIRS) + addr_pair = port.get(addr_apidef.ADDRESS_PAIRS) if port_sec and (has_ip or addr_pair): prf_id = profiles_dict[spoofguard_profile_key] if prf_id != spoofguard_profile_id: diff --git a/vmware_nsx/tests/unit/extensions/test_addresspairs.py b/vmware_nsx/tests/unit/extensions/test_addresspairs.py index 30e9c28099..8b031ae597 100644 --- a/vmware_nsx/tests/unit/extensions/test_addresspairs.py +++ b/vmware_nsx/tests/unit/extensions/test_addresspairs.py @@ -12,10 +12,11 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef from neutron_lib.api.definitions import port_security as psec from oslo_config import cfg -from neutron.extensions import allowedaddresspairs as addr_pair from neutron.tests.unit.db import test_allowedaddresspairs_db as ext_pairs from vmware_nsx.tests.unit.nsx_mh import test_plugin as test_nsx_plugin @@ -33,7 +34,7 @@ class TestAllowedAddressPairsNSXv2(test_nsx_plugin.NsxPluginV2TestCase, with self.network() as net: res = self._create_port(self.fmt, net['network']['id']) port = self.deserialize(self.fmt, res) - self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS], []) + self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], []) self._delete('ports', port['port']['id']) def test_create_port_security_false_allowed_address_pairs(self): @@ -60,7 +61,7 @@ class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin, port = self.deserialize(self.fmt, res) address_pairs = [{'mac_address': '00:00:00:00:00:01', 'ip_address': '10.0.0.1/24'}] - update_port = {'port': {addr_pair.ADDRESS_PAIRS: + update_port = {'port': {addr_apidef.ADDRESS_PAIRS: address_pairs}} req = self.new_update_request('ports', update_port, port['port']['id']) @@ -95,7 +96,7 @@ class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase, fixed_ips = [{'subnet_id': subnet['subnet']['id'], 'ip_address': '10.0.0.2'}] res = self._create_port(self.fmt, network['network']['id'], - arg_list=(addr_pair.ADDRESS_PAIRS, + arg_list=(addr_apidef.ADDRESS_PAIRS, 'fixed_ips'), allowed_address_pairs=address_pairs, fixed_ips=fixed_ips) @@ -107,11 +108,11 @@ class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase, with self.network() as net: address_pairs = [{'ip_address': '10.0.0.1'}] res = self._create_port(self.fmt, net['network']['id'], - arg_list=(addr_pair.ADDRESS_PAIRS,), + arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) address_pairs[0]['mac_address'] = port['port']['mac_address'] - self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS], + self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) @@ -119,14 +120,14 @@ class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase, with self.network() as net: address_pairs = [{'ip_address': '10.0.0.1'}] res = self._create_port(self.fmt, net['network']['id'], - arg_list=(addr_pair.ADDRESS_PAIRS,), + arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) - update_port = {'port': {addr_pair.ADDRESS_PAIRS: []}} + update_port = {'port': {addr_apidef.ADDRESS_PAIRS: []}} req = self.new_update_request('ports', update_port, port['port']['id']) port = self.deserialize(self.fmt, req.get_response(self.api)) - self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS], []) + self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], []) self._delete('ports', port['port']['id']) def test_update_add_address_pairs(self): @@ -134,13 +135,13 @@ class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase, res = self._create_port(self.fmt, net['network']['id']) port = self.deserialize(self.fmt, res) address_pairs = [{'ip_address': '10.0.0.1'}] - update_port = {'port': {addr_pair.ADDRESS_PAIRS: + update_port = {'port': {addr_apidef.ADDRESS_PAIRS: address_pairs}} req = self.new_update_request('ports', update_port, port['port']['id']) port = self.deserialize(self.fmt, req.get_response(self.api)) address_pairs[0]['mac_address'] = port['port']['mac_address'] - self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS], + self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) @@ -164,12 +165,12 @@ class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase, address_pairs = [{'ip_address': '10.0.0.1'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=('port_security_enabled', - addr_pair.ADDRESS_PAIRS,), + addr_apidef.ADDRESS_PAIRS,), port_security_enabled=True, allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) self.assertTrue(port['port'][psec.PORTSECURITY]) address_pairs[0]['mac_address'] = port['port']['mac_address'] - self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS], + self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) diff --git a/vmware_nsx/tests/unit/nsx_v/test_plugin.py b/vmware_nsx/tests/unit/nsx_v/test_plugin.py index 0e28d2b92b..da2090528d 100644 --- a/vmware_nsx/tests/unit/nsx_v/test_plugin.py +++ b/vmware_nsx/tests/unit/nsx_v/test_plugin.py @@ -21,7 +21,6 @@ import mock import netaddr from neutron.api.v2 import attributes from neutron.extensions import address_scope -from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import dvr as dist_router from neutron.extensions import l3 from neutron.extensions import l3_ext_gw_mode @@ -41,6 +40,7 @@ import neutron.tests.unit.extensions.test_portsecurity as test_psec import neutron.tests.unit.extensions.test_securitygroup as ext_sg from neutron.tests.unit import testlib_api from neutron_lib.api.definitions import address_scope as addr_apidef +from neutron_lib.api.definitions import allowedaddresspairs as addrp_apidef from neutron_lib.api.definitions import external_net as extnet_apidef from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext from neutron_lib.api.definitions import port_security as psec @@ -4462,7 +4462,7 @@ class TestNSXvAllowedAddressPairs(NsxVPluginV2TestCase, 'ip_address': '192.168.1.0/24'}] self._create_port(self.fmt, net['network']['id'], expected_res_status=webob.exc.HTTPBadRequest.code, - arg_list=(addr_pair.ADDRESS_PAIRS,), + arg_list=(addrp_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs)