use addr pairs api def from lib
The allowed address pairs extension's API definition was rehomed into neutron-lib with I7958a2d6f470f088ca2cb8ad638c075788f22851 and will be consumed in neutron with I46cfeee9711973ec15881c8dc2bd0bf763bc2226 This patch switches the code over to use lib's version rather than neutrons. Change-Id: I3aa4c617d99fd0d3e89d20ea7bc58fad7b014629
This commit is contained in:
parent
cf80ee0cc9
commit
dfccac0afa
@ -15,8 +15,10 @@
|
||||
|
||||
import uuid
|
||||
|
||||
from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef
|
||||
from neutron_lib.api.definitions import port as port_def
|
||||
from neutron_lib.api.definitions import port_security as psec
|
||||
from neutron_lib.exceptions import allowedaddresspairs as addr_exc
|
||||
from neutron_lib.exceptions import port_security as psec_exc
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
@ -36,7 +38,6 @@ from neutron.db import portbindings_db
|
||||
from neutron.db import portsecurity_db
|
||||
from neutron.db import securitygroups_db
|
||||
from neutron.db import vlantransparent_db as vlan_ext_db
|
||||
from neutron.extensions import allowedaddresspairs as addr_pair
|
||||
from neutron.extensions import multiprovidernet as mpnet
|
||||
from neutron.extensions import providernet
|
||||
from neutron.extensions import securitygroup as ext_sg
|
||||
@ -369,16 +370,17 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
port_data)
|
||||
|
||||
# allowed address pair checks
|
||||
if validators.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)):
|
||||
if validators.is_attr_set(port_data.get(
|
||||
addr_apidef.ADDRESS_PAIRS)):
|
||||
if not port_security:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
raise addr_exc.AddressPairAndPortSecurityRequired()
|
||||
else:
|
||||
self._process_create_allowed_address_pairs(
|
||||
context, neutron_db,
|
||||
port_data[addr_pair.ADDRESS_PAIRS])
|
||||
port_data[addr_apidef.ADDRESS_PAIRS])
|
||||
else:
|
||||
# remove ATTR_NOT_SPECIFIED
|
||||
port_data[addr_pair.ADDRESS_PAIRS] = []
|
||||
port_data[addr_apidef.ADDRESS_PAIRS] = []
|
||||
|
||||
self._process_portbindings_create_and_update(context,
|
||||
port['port'],
|
||||
@ -422,19 +424,19 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
if not ret_port[psec.PORTSECURITY]:
|
||||
# has address pairs in request
|
||||
if has_addr_pairs:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
raise addr_exc.AddressPairAndPortSecurityRequired()
|
||||
elif not delete_addr_pairs:
|
||||
# check if address pairs are in db
|
||||
ret_port[addr_pair.ADDRESS_PAIRS] = (
|
||||
ret_port[addr_apidef.ADDRESS_PAIRS] = (
|
||||
self.get_allowed_address_pairs(context, id))
|
||||
if ret_port[addr_pair.ADDRESS_PAIRS]:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
if ret_port[addr_apidef.ADDRESS_PAIRS]:
|
||||
raise addr_exc.AddressPairAndPortSecurityRequired()
|
||||
|
||||
if delete_addr_pairs or has_addr_pairs:
|
||||
# delete address pairs and read them in
|
||||
self._delete_allowed_address_pairs(context, id)
|
||||
self._process_create_allowed_address_pairs(
|
||||
context, ret_port, ret_port[addr_pair.ADDRESS_PAIRS])
|
||||
context, ret_port, ret_port[addr_apidef.ADDRESS_PAIRS])
|
||||
|
||||
if psec.PORTSECURITY in port['port']:
|
||||
self._process_port_port_security_update(
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import uuid
|
||||
|
||||
from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef
|
||||
from neutron_lib.api.definitions import external_net as extnet_apidef
|
||||
from neutron_lib.api.definitions import port_security as psec
|
||||
from neutron_lib.api import faults
|
||||
@ -22,6 +23,7 @@ from neutron_lib.api import validators
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import context as q_context
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from neutron_lib.exceptions import allowedaddresspairs as addr_exc
|
||||
from neutron_lib.exceptions import port_security as psec_exc
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
@ -57,7 +59,6 @@ from neutron.db import portbindings_db
|
||||
from neutron.db import portsecurity_db
|
||||
from neutron.db import quota_db # noqa
|
||||
from neutron.db import securitygroups_db
|
||||
from neutron.extensions import allowedaddresspairs as addr_pair
|
||||
from neutron.extensions import extraroute
|
||||
from neutron.extensions import l3
|
||||
from neutron.extensions import multiprovidernet as mpnet
|
||||
@ -433,7 +434,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
nsx_sec_profile_ids,
|
||||
port_data.get(qos.QUEUE),
|
||||
port_data.get(mac_ext.MAC_LEARNING),
|
||||
port_data.get(addr_pair.ADDRESS_PAIRS))
|
||||
port_data.get(addr_apidef.ADDRESS_PAIRS))
|
||||
|
||||
def _handle_create_port_exception(self, context, port_id,
|
||||
ls_uuid, lp_uuid):
|
||||
@ -1128,16 +1129,17 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
self._process_port_port_security_create(
|
||||
context, port_data, neutron_db)
|
||||
# allowed address pair checks
|
||||
if validators.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)):
|
||||
if validators.is_attr_set(port_data.get(
|
||||
addr_apidef.ADDRESS_PAIRS)):
|
||||
if not port_security:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
raise addr_exc.AddressPairAndPortSecurityRequired()
|
||||
else:
|
||||
self._process_create_allowed_address_pairs(
|
||||
context, neutron_db,
|
||||
port_data[addr_pair.ADDRESS_PAIRS])
|
||||
port_data[addr_apidef.ADDRESS_PAIRS])
|
||||
else:
|
||||
# remove ATTR_NOT_SPECIFIED
|
||||
port_data[addr_pair.ADDRESS_PAIRS] = []
|
||||
port_data[addr_apidef.ADDRESS_PAIRS] = []
|
||||
|
||||
# security group extension checks
|
||||
# NOTE: check_update_has_security_groups works fine for
|
||||
@ -1234,19 +1236,19 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
if not ret_port[psec.PORTSECURITY]:
|
||||
# has address pairs in request
|
||||
if has_addr_pairs:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
raise addr_exc.AddressPairAndPortSecurityRequired()
|
||||
elif not delete_addr_pairs:
|
||||
# check if address pairs are in db
|
||||
ret_port[addr_pair.ADDRESS_PAIRS] = (
|
||||
ret_port[addr_apidef.ADDRESS_PAIRS] = (
|
||||
self.get_allowed_address_pairs(context, id))
|
||||
if ret_port[addr_pair.ADDRESS_PAIRS]:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
if ret_port[addr_apidef.ADDRESS_PAIRS]:
|
||||
raise addr_exc.AddressPairAndPortSecurityRequired()
|
||||
|
||||
if (delete_addr_pairs or has_addr_pairs):
|
||||
# delete address pairs and read them in
|
||||
self._delete_allowed_address_pairs(context, id)
|
||||
self._process_create_allowed_address_pairs(
|
||||
context, ret_port, ret_port[addr_pair.ADDRESS_PAIRS])
|
||||
context, ret_port, ret_port[addr_apidef.ADDRESS_PAIRS])
|
||||
# checks if security groups were updated adding/modifying
|
||||
# security groups, port security is set and port has ip
|
||||
if not (has_ip and ret_port[psec.PORTSECURITY]):
|
||||
@ -1313,7 +1315,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
nsx_sec_profile_ids,
|
||||
ret_port[qos.QUEUE],
|
||||
ret_port.get(mac_ext.MAC_LEARNING),
|
||||
ret_port.get(addr_pair.ADDRESS_PAIRS))
|
||||
ret_port.get(addr_apidef.ADDRESS_PAIRS))
|
||||
|
||||
# Update the port status from nsx. If we fail here hide it
|
||||
# since the port was successfully updated but we were not
|
||||
|
@ -17,6 +17,8 @@ from distutils import version
|
||||
import uuid
|
||||
|
||||
import netaddr
|
||||
|
||||
from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef
|
||||
from neutron_lib.api.definitions import availability_zone as az_def
|
||||
from neutron_lib.api.definitions import external_net as extnet_apidef
|
||||
from neutron_lib.api.definitions import extra_dhcp_opt as ext_edo
|
||||
@ -34,6 +36,7 @@ from neutron_lib import constants
|
||||
from neutron_lib import context as n_context
|
||||
from neutron_lib.db import constants as db_const
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from neutron_lib.exceptions import allowedaddresspairs as addr_exc
|
||||
from neutron_lib.exceptions import port_security as psec_exc
|
||||
from neutron_lib.plugins import constants as plugin_const
|
||||
from neutron_lib.plugins import directory
|
||||
@ -73,7 +76,6 @@ from neutron.db import portsecurity_db
|
||||
from neutron.db import quota_db # noqa
|
||||
from neutron.db import securitygroups_db
|
||||
from neutron.db import vlantransparent_db
|
||||
from neutron.extensions import allowedaddresspairs as addr_pair
|
||||
from neutron.extensions import flavors
|
||||
from neutron.extensions import l3
|
||||
from neutron.extensions import multiprovidernet as mpnet
|
||||
@ -1657,7 +1659,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
return net_res
|
||||
|
||||
def _validate_address_pairs(self, attrs, db_port):
|
||||
for ap in attrs[addr_pair.ADDRESS_PAIRS]:
|
||||
for ap in attrs[addr_apidef.ADDRESS_PAIRS]:
|
||||
# Check that the IP address is a subnet
|
||||
if len(ap['ip_address'].split('/')) > 1:
|
||||
msg = _('NSXv does not support CIDR as address pairs')
|
||||
@ -1763,11 +1765,11 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
attrs = port[port_def.RESOURCE_NAME]
|
||||
if self._check_update_has_allowed_address_pairs(port):
|
||||
if not port_security:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
raise addr_exc.AddressPairAndPortSecurityRequired()
|
||||
self._validate_address_pairs(attrs, neutron_db)
|
||||
else:
|
||||
# remove ATTR_NOT_SPECIFIED
|
||||
attrs[addr_pair.ADDRESS_PAIRS] = []
|
||||
attrs[addr_apidef.ADDRESS_PAIRS] = []
|
||||
|
||||
# security group extension checks
|
||||
if has_ip and port_security:
|
||||
@ -1784,10 +1786,10 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
port_data,
|
||||
ssgids)
|
||||
|
||||
neutron_db[addr_pair.ADDRESS_PAIRS] = (
|
||||
neutron_db[addr_apidef.ADDRESS_PAIRS] = (
|
||||
self._process_create_allowed_address_pairs(
|
||||
context, neutron_db,
|
||||
attrs.get(addr_pair.ADDRESS_PAIRS)))
|
||||
attrs.get(addr_apidef.ADDRESS_PAIRS)))
|
||||
|
||||
self._process_port_create_extra_dhcp_opts(
|
||||
context, port_data, dhcp_opts)
|
||||
@ -1967,7 +1969,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
port_data = port['port']
|
||||
dhcp_opts = port_data.get(ext_edo.EXTRADHCPOPTS)
|
||||
self._validate_extra_dhcp_options(dhcp_opts)
|
||||
if addr_pair.ADDRESS_PAIRS in attrs:
|
||||
if addr_apidef.ADDRESS_PAIRS in attrs:
|
||||
self._validate_address_pairs(attrs, original_port)
|
||||
self._validate_max_ips_per_port(
|
||||
port_data.get('fixed_ips', []),
|
||||
@ -2090,7 +2092,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
context, port, original_port, ret_port)
|
||||
|
||||
update_assigned_addresses = False
|
||||
if addr_pair.ADDRESS_PAIRS in attrs:
|
||||
if addr_apidef.ADDRESS_PAIRS in attrs:
|
||||
update_assigned_addresses = self.update_address_pairs_on_port(
|
||||
context, id, port, original_port, ret_port)
|
||||
|
||||
@ -4319,7 +4321,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
||||
approved_addrs = [addr['ip_address'] for addr in port['fixed_ips']]
|
||||
# add in the address pair
|
||||
approved_addrs.extend(
|
||||
addr['ip_address'] for addr in port[addr_pair.ADDRESS_PAIRS])
|
||||
addr['ip_address'] for addr in port[addr_apidef.ADDRESS_PAIRS])
|
||||
# add the IPv6 link-local address if there is an IPv6 address
|
||||
if any([netaddr.valid_ipv6(address) for address in approved_addrs]):
|
||||
lla = str(netutils.get_ipv6_addr_by_EUI64(
|
||||
|
@ -14,12 +14,14 @@
|
||||
# under the License.
|
||||
|
||||
import netaddr
|
||||
from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef
|
||||
from neutron_lib.api.definitions import availability_zone as az_def
|
||||
from neutron_lib.api.definitions import external_net as extnet_apidef
|
||||
from neutron_lib.api.definitions import network as net_def
|
||||
from neutron_lib.api.definitions import port_security as psec
|
||||
from neutron_lib.api import faults
|
||||
from neutron_lib.api.validators import availability_zone as az_validator
|
||||
from neutron_lib.exceptions import allowedaddresspairs as addr_exc
|
||||
from neutron_lib.exceptions import port_security as psec_exc
|
||||
from neutron_lib.services.qos import constants as qos_consts
|
||||
|
||||
@ -48,7 +50,6 @@ from neutron.db import models_v2
|
||||
from neutron.db import portbindings_db
|
||||
from neutron.db import portsecurity_db
|
||||
from neutron.db import securitygroups_db
|
||||
from neutron.extensions import allowedaddresspairs as addr_pair
|
||||
from neutron.extensions import l3
|
||||
from neutron.extensions import providernet
|
||||
from neutron.extensions import securitygroup as ext_sg
|
||||
@ -1614,7 +1615,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
address_bindings.append(nsx_resources.PacketAddressClassifier(
|
||||
fixed_ip['ip_address'], port['mac_address'], None))
|
||||
|
||||
for pair in port.get(addr_pair.ADDRESS_PAIRS):
|
||||
for pair in port.get(addr_apidef.ADDRESS_PAIRS):
|
||||
address_bindings.append(nsx_resources.PacketAddressClassifier(
|
||||
pair['ip_address'], pair['mac_address'], None))
|
||||
|
||||
@ -1776,7 +1777,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
|
||||
mac_learning_profile_set = False
|
||||
if psec_is_on:
|
||||
address_pairs = port_data.get(addr_pair.ADDRESS_PAIRS)
|
||||
address_pairs = port_data.get(addr_apidef.ADDRESS_PAIRS)
|
||||
if validators.is_attr_set(address_pairs) and address_pairs:
|
||||
mac_learning_profile_set = True
|
||||
profiles.append(self._get_port_security_profile_id())
|
||||
@ -1884,10 +1885,10 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
context, port_data, neutron_db)
|
||||
|
||||
# allowed address pair checks
|
||||
address_pairs = port_data.get(addr_pair.ADDRESS_PAIRS)
|
||||
address_pairs = port_data.get(addr_apidef.ADDRESS_PAIRS)
|
||||
if validators.is_attr_set(address_pairs):
|
||||
if not port_security:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
raise addr_exc.AddressPairAndPortSecurityRequired()
|
||||
else:
|
||||
self._validate_address_pairs(address_pairs)
|
||||
self._process_create_allowed_address_pairs(
|
||||
@ -1895,7 +1896,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
address_pairs)
|
||||
else:
|
||||
# remove ATTR_NOT_SPECIFIED
|
||||
port_data[addr_pair.ADDRESS_PAIRS] = []
|
||||
port_data[addr_apidef.ADDRESS_PAIRS] = []
|
||||
|
||||
if port_security and has_ip:
|
||||
self._ensure_default_security_group_on_port(context, port)
|
||||
@ -2534,22 +2535,22 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
if not updated_port[psec.PORTSECURITY]:
|
||||
# has address pairs in request
|
||||
if has_addr_pairs:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
raise addr_exc.AddressPairAndPortSecurityRequired()
|
||||
elif not delete_addr_pairs:
|
||||
# check if address pairs are in db
|
||||
updated_port[addr_pair.ADDRESS_PAIRS] = (
|
||||
updated_port[addr_apidef.ADDRESS_PAIRS] = (
|
||||
self.get_allowed_address_pairs(context, id))
|
||||
if updated_port[addr_pair.ADDRESS_PAIRS]:
|
||||
raise addr_pair.AddressPairAndPortSecurityRequired()
|
||||
if updated_port[addr_apidef.ADDRESS_PAIRS]:
|
||||
raise addr_exc.AddressPairAndPortSecurityRequired()
|
||||
|
||||
if delete_addr_pairs or has_addr_pairs:
|
||||
self._validate_address_pairs(
|
||||
updated_port[addr_pair.ADDRESS_PAIRS])
|
||||
updated_port[addr_apidef.ADDRESS_PAIRS])
|
||||
# delete address pairs and read them in
|
||||
self._delete_allowed_address_pairs(context, id)
|
||||
self._process_create_allowed_address_pairs(
|
||||
context, updated_port,
|
||||
updated_port[addr_pair.ADDRESS_PAIRS])
|
||||
updated_port[addr_apidef.ADDRESS_PAIRS])
|
||||
|
||||
# No port security is allowed if the port belongs to an ENS TZ
|
||||
if (updated_port[psec.PORTSECURITY] and
|
||||
@ -2709,7 +2710,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
|
||||
psec_is_on = self._get_port_security_profile_id() in switch_profile_ids
|
||||
|
||||
address_pairs = updated_port.get(addr_pair.ADDRESS_PAIRS)
|
||||
address_pairs = updated_port.get(addr_apidef.ADDRESS_PAIRS)
|
||||
mac_learning_profile_set = (
|
||||
validators.is_attr_set(address_pairs) and address_pairs and
|
||||
psec_is_on)
|
||||
@ -2867,9 +2868,9 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
# revert allowed address pairs
|
||||
if port_security:
|
||||
orig_pair = original_port.get(
|
||||
addr_pair.ADDRESS_PAIRS)
|
||||
addr_apidef.ADDRESS_PAIRS)
|
||||
updated_pair = updated_port.get(
|
||||
addr_pair.ADDRESS_PAIRS)
|
||||
addr_apidef.ADDRESS_PAIRS)
|
||||
if orig_pair != updated_pair:
|
||||
self._delete_allowed_address_pairs(context, id)
|
||||
if orig_pair:
|
||||
|
@ -37,7 +37,7 @@ from neutron.db import allowedaddresspairs_db as addr_pair_db
|
||||
from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import l3_db
|
||||
from neutron.db import portsecurity_db
|
||||
from neutron.extensions import allowedaddresspairs
|
||||
from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef
|
||||
from neutron_lib.callbacks import registry
|
||||
from neutron_lib import constants as const
|
||||
from neutron_lib import context as neutron_context
|
||||
@ -180,7 +180,7 @@ def list_missing_ports(resource, event, trigger, **kwargs):
|
||||
# neutron spoofguard profile should be attached
|
||||
port_sec, has_ip = plugin._determine_port_security_and_has_ip(
|
||||
admin_cxt, port)
|
||||
addr_pair = port.get(allowedaddresspairs.ADDRESS_PAIRS)
|
||||
addr_pair = port.get(addr_apidef.ADDRESS_PAIRS)
|
||||
if port_sec and (has_ip or addr_pair):
|
||||
prf_id = profiles_dict[spoofguard_profile_key]
|
||||
if prf_id != spoofguard_profile_id:
|
||||
|
@ -12,10 +12,11 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef
|
||||
from neutron_lib.api.definitions import port_security as psec
|
||||
from oslo_config import cfg
|
||||
|
||||
from neutron.extensions import allowedaddresspairs as addr_pair
|
||||
from neutron.tests.unit.db import test_allowedaddresspairs_db as ext_pairs
|
||||
|
||||
from vmware_nsx.tests.unit.nsx_mh import test_plugin as test_nsx_plugin
|
||||
@ -33,7 +34,7 @@ class TestAllowedAddressPairsNSXv2(test_nsx_plugin.NsxPluginV2TestCase,
|
||||
with self.network() as net:
|
||||
res = self._create_port(self.fmt, net['network']['id'])
|
||||
port = self.deserialize(self.fmt, res)
|
||||
self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS], [])
|
||||
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], [])
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_create_port_security_false_allowed_address_pairs(self):
|
||||
@ -60,7 +61,7 @@ class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin,
|
||||
port = self.deserialize(self.fmt, res)
|
||||
address_pairs = [{'mac_address': '00:00:00:00:00:01',
|
||||
'ip_address': '10.0.0.1/24'}]
|
||||
update_port = {'port': {addr_pair.ADDRESS_PAIRS:
|
||||
update_port = {'port': {addr_apidef.ADDRESS_PAIRS:
|
||||
address_pairs}}
|
||||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
@ -95,7 +96,7 @@ class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase,
|
||||
fixed_ips = [{'subnet_id': subnet['subnet']['id'],
|
||||
'ip_address': '10.0.0.2'}]
|
||||
res = self._create_port(self.fmt, network['network']['id'],
|
||||
arg_list=(addr_pair.ADDRESS_PAIRS,
|
||||
arg_list=(addr_apidef.ADDRESS_PAIRS,
|
||||
'fixed_ips'),
|
||||
allowed_address_pairs=address_pairs,
|
||||
fixed_ips=fixed_ips)
|
||||
@ -107,11 +108,11 @@ class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase,
|
||||
with self.network() as net:
|
||||
address_pairs = [{'ip_address': '10.0.0.1'}]
|
||||
res = self._create_port(self.fmt, net['network']['id'],
|
||||
arg_list=(addr_pair.ADDRESS_PAIRS,),
|
||||
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
||||
allowed_address_pairs=address_pairs)
|
||||
port = self.deserialize(self.fmt, res)
|
||||
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
||||
self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS],
|
||||
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
||||
address_pairs)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
@ -119,14 +120,14 @@ class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase,
|
||||
with self.network() as net:
|
||||
address_pairs = [{'ip_address': '10.0.0.1'}]
|
||||
res = self._create_port(self.fmt, net['network']['id'],
|
||||
arg_list=(addr_pair.ADDRESS_PAIRS,),
|
||||
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
||||
allowed_address_pairs=address_pairs)
|
||||
port = self.deserialize(self.fmt, res)
|
||||
update_port = {'port': {addr_pair.ADDRESS_PAIRS: []}}
|
||||
update_port = {'port': {addr_apidef.ADDRESS_PAIRS: []}}
|
||||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
port = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS], [])
|
||||
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], [])
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
def test_update_add_address_pairs(self):
|
||||
@ -134,13 +135,13 @@ class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase,
|
||||
res = self._create_port(self.fmt, net['network']['id'])
|
||||
port = self.deserialize(self.fmt, res)
|
||||
address_pairs = [{'ip_address': '10.0.0.1'}]
|
||||
update_port = {'port': {addr_pair.ADDRESS_PAIRS:
|
||||
update_port = {'port': {addr_apidef.ADDRESS_PAIRS:
|
||||
address_pairs}}
|
||||
req = self.new_update_request('ports', update_port,
|
||||
port['port']['id'])
|
||||
port = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
||||
self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS],
|
||||
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
||||
address_pairs)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
@ -164,12 +165,12 @@ class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase,
|
||||
address_pairs = [{'ip_address': '10.0.0.1'}]
|
||||
res = self._create_port(self.fmt, net['network']['id'],
|
||||
arg_list=('port_security_enabled',
|
||||
addr_pair.ADDRESS_PAIRS,),
|
||||
addr_apidef.ADDRESS_PAIRS,),
|
||||
port_security_enabled=True,
|
||||
allowed_address_pairs=address_pairs)
|
||||
port = self.deserialize(self.fmt, res)
|
||||
self.assertTrue(port['port'][psec.PORTSECURITY])
|
||||
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
||||
self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS],
|
||||
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
||||
address_pairs)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
@ -21,7 +21,6 @@ import mock
|
||||
import netaddr
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.extensions import address_scope
|
||||
from neutron.extensions import allowedaddresspairs as addr_pair
|
||||
from neutron.extensions import dvr as dist_router
|
||||
from neutron.extensions import l3
|
||||
from neutron.extensions import l3_ext_gw_mode
|
||||
@ -41,6 +40,7 @@ import neutron.tests.unit.extensions.test_portsecurity as test_psec
|
||||
import neutron.tests.unit.extensions.test_securitygroup as ext_sg
|
||||
from neutron.tests.unit import testlib_api
|
||||
from neutron_lib.api.definitions import address_scope as addr_apidef
|
||||
from neutron_lib.api.definitions import allowedaddresspairs as addrp_apidef
|
||||
from neutron_lib.api.definitions import external_net as extnet_apidef
|
||||
from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext
|
||||
from neutron_lib.api.definitions import port_security as psec
|
||||
@ -4462,7 +4462,7 @@ class TestNSXvAllowedAddressPairs(NsxVPluginV2TestCase,
|
||||
'ip_address': '192.168.1.0/24'}]
|
||||
self._create_port(self.fmt, net['network']['id'],
|
||||
expected_res_status=webob.exc.HTTPBadRequest.code,
|
||||
arg_list=(addr_pair.ADDRESS_PAIRS,),
|
||||
arg_list=(addrp_apidef.ADDRESS_PAIRS,),
|
||||
allowed_address_pairs=address_pairs)
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user