369 lines
18 KiB
Python
369 lines
18 KiB
Python
# Copyright (c) 2014 OpenStack Foundation.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef
|
|
from neutron_lib.api.definitions import port_security as psec
|
|
from oslo_config import cfg
|
|
|
|
from neutron.tests.unit.db import test_allowedaddresspairs_db as ext_pairs
|
|
|
|
from vmware_nsx.tests.unit.nsx_p import test_plugin as test_p_plugin
|
|
from vmware_nsx.tests.unit.nsx_v import test_plugin as test_nsx_v_plugin
|
|
from vmware_nsx.tests.unit.nsx_v3 import test_constants as v3_constants
|
|
from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_v3_plugin
|
|
|
|
|
|
class TestAllowedAddressPairsNSXv2(test_v3_plugin.NsxV3PluginTestCaseMixin,
|
|
ext_pairs.TestAllowedAddressPairs):
|
|
|
|
# TODO(arosen): move to ext_pairs.TestAllowedAddressPairs once all
|
|
# plugins do this correctly.
|
|
def test_create_port_no_allowed_address_pairs(self):
|
|
with self.network() as net:
|
|
res = self._create_port(self.fmt, net['network']['id'])
|
|
port = self.deserialize(self.fmt, res)
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], [])
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
def test_create_port_security_false_allowed_address_pairs(self):
|
|
self.skipTest('TBD')
|
|
|
|
|
|
class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin,
|
|
ext_pairs.TestAllowedAddressPairs):
|
|
|
|
def setUp(self, plugin=test_p_plugin.PLUGIN_NAME,
|
|
ext_mgr=None,
|
|
service_plugins=None):
|
|
super(TestAllowedAddressPairsNSXp, self).setUp(
|
|
plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins)
|
|
|
|
def test_create_bad_address_pairs_with_cidr(self):
|
|
address_pairs = [{'mac_address': '00:00:00:00:00:01',
|
|
'ip_address': '10.0.0.1/24'}]
|
|
self._create_port_with_address_pairs(address_pairs, 400)
|
|
|
|
def test_create_port_allowed_address_pairs_v6(self):
|
|
with self.network() as net:
|
|
# Single IPv6
|
|
address_pairs = [{'ip_address': '1001::12'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
|
address_pairs)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
# IPv6 cidr
|
|
address_pairs = [{'ip_address': '1001::/64'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
|
address_pairs)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
# Illegal IPv6 cidr
|
|
address_pairs = [{'ip_address': '1001::12/64'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
self.assertIn('NeutronError', port)
|
|
|
|
# Too many ipv6 pairs
|
|
cfg.CONF.set_default('max_allowed_address_pair', 300)
|
|
address_pairs = []
|
|
count = 1
|
|
while count < 17:
|
|
address_pairs.append({'ip_address': '1001::%s' % count})
|
|
count += 1
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
self.assertIn('NeutronError', port)
|
|
|
|
# Legal number of ipv6 pairs
|
|
address_pairs = []
|
|
count = 1
|
|
while count < 13:
|
|
address_pairs.append({'ip_address': '1001::%s' % count})
|
|
count += 1
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
self.assertNotIn('NeutronError', port)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
def test_create_port_allowed_address_pairs_v4(self):
|
|
with self.network() as net:
|
|
# Single IPv4
|
|
address_pairs = [{'ip_address': '10.0.0.12'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
|
address_pairs)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
# IPv4 cidr
|
|
address_pairs = [{'ip_address': '10.0.0.0/24'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
|
address_pairs)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
# Illegal IPv4 cidr
|
|
address_pairs = [{'ip_address': '10.0.0.1/24'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
self.assertIn('NeutronError', port)
|
|
|
|
# Too many ipv4 pairs
|
|
cfg.CONF.set_default('max_allowed_address_pair', 300)
|
|
address_pairs = []
|
|
count = 1
|
|
while count < 129:
|
|
address_pairs.append({'ip_address': '10.0.0.%s' % count})
|
|
count += 1
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
self.assertIn('NeutronError', port)
|
|
|
|
# Legal number of ipv4 pairs
|
|
address_pairs = []
|
|
count = 1
|
|
while count < 125:
|
|
address_pairs.append({'ip_address': '10.0.0.%s' % count})
|
|
count += 1
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
self.assertNotIn('NeutronError', port)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
def test_update_add_bad_address_pairs_with_cidr(self):
|
|
with self.network() as net:
|
|
res = self._create_port(self.fmt, net['network']['id'])
|
|
port = self.deserialize(self.fmt, res)
|
|
address_pairs = [{'mac_address': '00:00:00:00:00:01',
|
|
'ip_address': '10.0.0.1/24'}]
|
|
update_port = {'port': {addr_apidef.ADDRESS_PAIRS:
|
|
address_pairs}}
|
|
req = self.new_update_request('ports', update_port,
|
|
port['port']['id'])
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(res.status_int, 400)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
def test_mac_configuration(self):
|
|
address_pairs = [{'mac_address': 'fa16.3e3e.3d01',
|
|
'ip_address': '10.0.0.1'}]
|
|
self._create_port_with_address_pairs(address_pairs, 201)
|
|
|
|
address_pairs = [{'mac_address': 'fa-16-3e-3e-3d-c4',
|
|
'ip_address': '10.0.0.1'}]
|
|
self._create_port_with_address_pairs(address_pairs, 201)
|
|
|
|
def test_create_port_security_false_allowed_address_pairs(self):
|
|
self.skipTest('TBD')
|
|
|
|
def test_create_overlap_with_fixed_ip(self):
|
|
self.skipTest('Not supported')
|
|
|
|
|
|
class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin,
|
|
ext_pairs.TestAllowedAddressPairs):
|
|
|
|
def setUp(self, plugin=v3_constants.PLUGIN_NAME,
|
|
ext_mgr=None,
|
|
service_plugins=None):
|
|
super(TestAllowedAddressPairsNSXv3, self).setUp(
|
|
plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins)
|
|
|
|
def test_create_bad_address_pairs_with_cidr(self):
|
|
address_pairs = [{'mac_address': '00:00:00:00:00:01',
|
|
'ip_address': '10.0.0.1/24'}]
|
|
self._create_port_with_address_pairs(address_pairs, 400)
|
|
|
|
def test_create_port_allowed_address_pairs_v6(self):
|
|
with self.network() as net:
|
|
# Single IPv6 address
|
|
address_pairs = [{'ip_address': '1001::12'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
|
address_pairs)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
# IPv6 cidr
|
|
address_pairs = [{'ip_address': '1001::/64'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
|
address_pairs)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
# Illegal IPv6 cidr
|
|
address_pairs = [{'ip_address': '1001::12/64'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
self.assertIn('NeutronError', port)
|
|
|
|
def test_update_add_bad_address_pairs_with_cidr(self):
|
|
with self.network() as net:
|
|
res = self._create_port(self.fmt, net['network']['id'])
|
|
port = self.deserialize(self.fmt, res)
|
|
address_pairs = [{'mac_address': '00:00:00:00:00:01',
|
|
'ip_address': '10.0.0.1/24'}]
|
|
update_port = {'port': {addr_apidef.ADDRESS_PAIRS:
|
|
address_pairs}}
|
|
req = self.new_update_request('ports', update_port,
|
|
port['port']['id'])
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(res.status_int, 400)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
def test_create_port_security_false_allowed_address_pairs(self):
|
|
self.skipTest('TBD')
|
|
|
|
|
|
class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase,
|
|
ext_pairs.TestAllowedAddressPairs):
|
|
|
|
def setUp(self, plugin='vmware_nsx.plugin.NsxVPlugin',
|
|
ext_mgr=None,
|
|
service_plugins=None):
|
|
super(TestAllowedAddressPairsNSXv, self).setUp(
|
|
plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins)
|
|
|
|
def test_create_port_security_false_allowed_address_pairs(self):
|
|
self.skipTest('TBD')
|
|
|
|
def test_update_port_security_off_address_pairs(self):
|
|
self.skipTest('Not supported')
|
|
|
|
def test_create_overlap_with_fixed_ip(self):
|
|
address_pairs = [{'ip_address': '10.0.0.2'}]
|
|
with self.network() as network:
|
|
with self.subnet(network=network, cidr='10.0.0.0/24',
|
|
enable_dhcp=False) as subnet:
|
|
fixed_ips = [{'subnet_id': subnet['subnet']['id'],
|
|
'ip_address': '10.0.0.2'}]
|
|
res = self._create_port(self.fmt, network['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,
|
|
'fixed_ips'),
|
|
allowed_address_pairs=address_pairs,
|
|
fixed_ips=fixed_ips)
|
|
self.assertEqual(res.status_int, 201)
|
|
port = self.deserialize(self.fmt, res)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
def test_create_port_allowed_address_pairs(self):
|
|
with self.network() as net:
|
|
address_pairs = [{'ip_address': '10.0.0.1'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
|
address_pairs)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
def _test_create_port_remove_allowed_address_pairs(self, update_value):
|
|
with self.network() as net:
|
|
address_pairs = [{'ip_address': '10.0.0.1'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
update_port = {'port': {addr_apidef.ADDRESS_PAIRS: []}}
|
|
req = self.new_update_request('ports', update_port,
|
|
port['port']['id'])
|
|
port = self.deserialize(self.fmt, req.get_response(self.api))
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], [])
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
def test_update_add_address_pairs(self):
|
|
with self.network() as net:
|
|
res = self._create_port(self.fmt, net['network']['id'])
|
|
port = self.deserialize(self.fmt, res)
|
|
address_pairs = [{'ip_address': '10.0.0.1'}]
|
|
update_port = {'port': {addr_apidef.ADDRESS_PAIRS:
|
|
address_pairs}}
|
|
req = self.new_update_request('ports', update_port,
|
|
port['port']['id'])
|
|
port = self.deserialize(self.fmt, req.get_response(self.api))
|
|
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
|
address_pairs)
|
|
self._delete('ports', port['port']['id'])
|
|
|
|
def test_mac_configuration(self):
|
|
address_pairs = [{'mac_address': '00:00:00:00:00:01',
|
|
'ip_address': '10.0.0.1'}]
|
|
self._create_port_with_address_pairs(address_pairs, 400)
|
|
|
|
def test_equal_to_max_allowed_address_pair(self):
|
|
cfg.CONF.set_default('max_allowed_address_pair', 3)
|
|
address_pairs = [{'ip_address': '10.0.0.1'},
|
|
{'ip_address': '10.0.0.2'},
|
|
{'ip_address': '10.0.0.3'}]
|
|
self._create_port_with_address_pairs(address_pairs, 201)
|
|
|
|
def test_create_port_security_true_allowed_address_pairs(self):
|
|
if self._skip_port_security:
|
|
self.skipTest("Plugin does not implement port-security extension")
|
|
|
|
with self.network() as net:
|
|
address_pairs = [{'ip_address': '10.0.0.1'}]
|
|
res = self._create_port(self.fmt, net['network']['id'],
|
|
arg_list=('port_security_enabled',
|
|
addr_apidef.ADDRESS_PAIRS,),
|
|
port_security_enabled=True,
|
|
allowed_address_pairs=address_pairs)
|
|
port = self.deserialize(self.fmt, res)
|
|
self.assertTrue(port['port'][psec.PORTSECURITY])
|
|
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
|
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
|
address_pairs)
|
|
self._delete('ports', port['port']['id'])
|