Merge "NSX|v: refactor unittests with disabled dhcp"

This commit is contained in:
Zuul 2018-11-23 12:47:58 +00:00 committed by Gerrit Code Review
commit 8d24384328

View File

@ -15,7 +15,8 @@
import contextlib
import copy
import itertools
import decorator
from eventlet import greenthread
import mock
@ -58,7 +59,6 @@ from neutron_lib.plugins import directory
from neutron_lib.plugins import utils
from neutron_lib.services.qos import constants as qos_consts
from neutron_lib.utils import helpers
from neutron_lib.utils import net
from oslo_config import cfg
from oslo_utils import uuidutils
import six
@ -118,6 +118,17 @@ def set_az_in_config(name, resource_pool_id="respool-7",
group=group_name)
# Override subnet creation in some tests to create a subnet with dhcp
# disabled
@decorator.decorator
def with_no_dhcp_subnet(f, *args, **kwargs):
obj = args[0]
obj.subnet = obj.no_dhcp_subnet
result = f(*args, **kwargs)
obj.subnet = obj.original_subnet
return result
class NsxVPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
def _create_network(self, fmt, name, admin_state_up,
@ -279,6 +290,7 @@ class NsxVPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
plugin_instance.init_complete(None, None, {})
self.context = context.get_admin_context()
self.original_subnet = self.subnet
self.internal_net_id = None
if self.with_md_proxy:
@ -287,6 +299,11 @@ class NsxVPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE,
'default')['network_id']
def no_dhcp_subnet(self, *args, **kwargs):
if 'enable_dhcp' in kwargs:
return self.original_subnet(*args, **kwargs)
return self.original_subnet(*args, enable_dhcp=False, **kwargs)
def _get_core_plugin_with_dvs(self):
# enable dvs features to allow policy with QOS
cfg.CONF.set_default('use_dvs_features', True, 'nsxv')
@ -1071,19 +1088,9 @@ class TestPortsV2(NsxVPluginV2TestCase,
net_id2 = port['port']['id'] # other net uuid, same mac
self.assertTrue(self.plugin._is_mac_in_use(ctx, net_id2, mac))
@with_no_dhcp_subnet
def test_duplicate_mac_generation(self):
# simulate duplicate mac generation to make sure DBDuplicate is retried
responses = ['12:34:56:78:00:00', '12:34:56:78:00:00',
'12:34:56:78:00:01']
with mock.patch.object(net, 'random_mac_generator',
return_value=itertools.cycle(responses)) as grand_mac:
with self.subnet(enable_dhcp=False) as s:
with self.port(subnet=s) as p1, self.port(subnet=s) as p2:
self.assertEqual('12:34:56:78:00:00',
p1['port']['mac_address'])
self.assertEqual('12:34:56:78:00:01',
p2['port']['mac_address'])
self.assertEqual(3, grand_mac.call_count)
return super(TestPortsV2, self).test_duplicate_mac_generation()
def test_get_ports_count(self):
with self.port(), self.port(), self.port(), self.port() as p:
@ -1096,31 +1103,9 @@ class TestPortsV2(NsxVPluginV2TestCase,
# for DHCP
self.assertEqual(8, count)
@with_no_dhcp_subnet
def test_requested_ips_only(self):
with self.subnet(enable_dhcp=False) as subnet:
fixed_ip_data = [{'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
ips_only = ['10.0.0.18', '10.0.0.20', '10.0.0.22', '10.0.0.21',
'10.0.0.3', '10.0.0.17', '10.0.0.19']
ports_to_delete = []
for i in ips_only:
kwargs = {"fixed_ips": [{'ip_address': i}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ports_to_delete.append(port)
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(i, ips[0]['ip_address'])
self.assertEqual(subnet['subnet']['id'],
ips[0]['subnet_id'])
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
return super(TestPortsV2, self).test_requested_ips_only()
def test_delete_network_port_exists_owned_by_network_race(self):
self.skipTest('Skip need to address in future')
@ -1177,91 +1162,31 @@ class TestPortsV2(NsxVPluginV2TestCase,
def test_create_port_anticipating_allocation(self):
self.skipTest('Multiple fixed ips on a port are not supported')
@with_no_dhcp_subnet
def test_list_ports(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet) as port1,\
self.port(subnet) as port2,\
self.port(subnet) as port3:
self._test_list_resources('port', [port1, port2, port3])
return super(TestPortsV2, self).test_list_ports()
@with_no_dhcp_subnet
def test_list_ports_public_network(self):
with self.network(shared=True) as network:
with self.subnet(network, enable_dhcp=False) as subnet,\
self.port(subnet, tenant_id='tenant_1') as port1,\
self.port(subnet, tenant_id='tenant_2') as port2:
# Admin request - must return both ports
self._test_list_resources('port', [port1, port2])
# Tenant_1 request - must return single port
q_context = context.Context('', 'tenant_1')
self._test_list_resources('port', [port1],
neutron_context=q_context)
# Tenant_2 request - must return single port
q_context = context.Context('', 'tenant_2')
self._test_list_resources('port', [port2],
neutron_context=q_context)
return super(TestPortsV2, self).test_list_ports_public_network()
@with_no_dhcp_subnet
def test_list_ports_with_pagination_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=test_plugin._fake_get_pagination_helper)
helper_patcher.start()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, mac_address='00:00:00:00:00:01') as port1,\
self.port(subnet, mac_address='00:00:00:00:00:02') as port2,\
self.port(subnet, mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
return super(TestPortsV2,
self).test_list_ports_with_pagination_emulated()
@with_no_dhcp_subnet
def test_list_ports_with_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, mac_address='00:00:00:00:00:01') as port1,\
self.port(subnet, mac_address='00:00:00:00:00:02') as port2,\
self.port(subnet, mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
return super(TestPortsV2,
self).test_list_ports_with_pagination_native()
@with_no_dhcp_subnet
def test_list_ports_with_sort_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_sorting_helper',
new=test_plugin._fake_get_sorting_helper)
helper_patcher.start()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, admin_state_up='True',
mac_address='00:00:00:00:00:01') as port1,\
self.port(subnet, admin_state_up='False',
mac_address='00:00:00:00:00:02') as port2,\
self.port(subnet, admin_state_up='False',
mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
return super(TestPortsV2, self).test_list_ports_with_sort_emulated()
@with_no_dhcp_subnet
def test_list_ports_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, admin_state_up='True',
mac_address='00:00:00:00:00:01') as port1,\
self.port(subnet, admin_state_up='False',
mac_address='00:00:00:00:00:02') as port2,\
self.port(subnet, admin_state_up='False',
mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
return super(TestPortsV2, self).test_list_ports_with_sort_native()
def test_list_ports_filtered_by_security_groups(self):
ctx = context.get_admin_context()
@ -1413,93 +1338,23 @@ class TestPortsV2(NsxVPluginV2TestCase,
self.new_update_request('ports',
update, port['port']['id'])
@with_no_dhcp_subnet
def test_ports_vif_host(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
host_arg = {portbindings.HOST_ID: self.hostname}
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, name='name1',
arg_list=(portbindings.HOST_ID,), **host_arg),\
self.port(subnet, name='name2'):
ctx = context.get_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for port in ports:
if port['name'] == 'name1':
self._check_response_portbindings_host(port)
else:
self.assertFalse(port[portbindings.HOST_ID])
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False)
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for non_admin_port in ports:
self._check_response_no_portbindings_host(non_admin_port)
return super(TestPortsV2, self).test_ports_vif_host()
@with_no_dhcp_subnet
def test_ports_vif_host_update(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
host_arg = {portbindings.HOST_ID: self.hostname}
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, name='name1',
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1,\
self.port(subnet, name='name2') as port2:
data = {'port': {portbindings.HOST_ID: 'testhosttemp'}}
req = self.new_update_request(
'ports', data, port1['port']['id'])
req.get_response(self.api)
req = self.new_update_request(
'ports', data, port2['port']['id'])
ctx = context.get_admin_context()
req.get_response(self.api)
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for port in ports:
self.assertEqual('testhosttemp', port[portbindings.HOST_ID])
return super(TestPortsV2, self).test_ports_vif_host_update()
@with_no_dhcp_subnet
def test_ports_vif_details(self):
plugin = directory.get_plugin()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet), self.port(subnet):
ctx = context.get_admin_context()
ports = plugin.get_ports(ctx)
self.assertEqual(len(ports), 2)
for port in ports:
self._check_response_portbindings(port)
# By default user is admin - now test non admin user
ctx = self._get_non_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(len(ports), 2)
for non_admin_port in ports:
self._check_response_no_portbindings(non_admin_port)
return super(TestPortsV2, self).test_ports_vif_details()
@with_no_dhcp_subnet
def test_ports_vnic_type(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, name='name1',
arg_list=(portbindings.VNIC_TYPE,), **vnic_arg),\
self.port(subnet, name='name2'):
ctx = context.get_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for port in ports:
if port['name'] == 'name1':
self._check_response_portbindings_vnic_type(port)
else:
self.assertEqual(portbindings.VNIC_NORMAL,
port[portbindings.VNIC_TYPE])
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False)
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for non_admin_port in ports:
self._check_response_portbindings_vnic_type(non_admin_port)
return super(TestPortsV2, self).test_ports_vnic_type()
@with_no_dhcp_subnet
def test_ports_vnic_type_list(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
@ -1534,51 +1389,13 @@ class TestPortsV2(NsxVPluginV2TestCase,
def test_requested_subnet_id_v4_and_v6(self):
self.skipTest('Multiple fixed ips on a port are not supported')
@with_no_dhcp_subnet
def test_update_port_update_ip(self):
"""Test update of port IP.
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
"""
with self.subnet(enable_dhcp=False) as subnet:
fixed_ip_data = [{'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.10', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
return super(TestPortsV2, self).test_update_port_update_ip()
@with_no_dhcp_subnet
def test_update_port_update_ips(self):
"""Update IP and associate new IP on port.
Check a port update with the specified subnet_id's. A IP address
will be allocated for each subnet_id.
"""
with self.subnet(enable_dhcp=False) as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': '10.0.0.3'}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.3', ips[0]['ip_address'], '10.0.0.3')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
return super(TestPortsV2, self).test_update_port_update_ips()
def test_update_port_update_ip_dhcp(self):
#Test updating a port IP when the device owner is DHCP
@ -1785,28 +1602,9 @@ class TestPortsV2(NsxVPluginV2TestCase,
test_create_port_with_multiple_ipv4_and_ipv6_subnets()
self.assertEqual(ctx_manager.exception.code, 400)
@with_no_dhcp_subnet
def test_list_ports_for_network_owner(self):
with self.network(tenant_id='tenant_1') as network:
with self.subnet(network, enable_dhcp=False) as subnet:
with self.port(subnet, tenant_id='tenant_1') as port1,\
self.port(subnet, tenant_id='tenant_2') as port2:
# network owner request, should return all ports
port_res = self._list_ports(
'json', set_context=True, tenant_id='tenant_1')
port_list = self.deserialize('json', port_res)['ports']
port_ids = [p['id'] for p in port_list]
self.assertEqual(2, len(port_list))
self.assertIn(port1['port']['id'], port_ids)
self.assertIn(port2['port']['id'], port_ids)
# another tenant request, only return ports belong to it
port_res = self._list_ports(
'json', set_context=True, tenant_id='tenant_2')
port_list = self.deserialize('json', port_res)['ports']
port_ids = [p['id'] for p in port_list]
self.assertEqual(1, len(port_list))
self.assertNotIn(port1['port']['id'], port_ids)
self.assertIn(port2['port']['id'], port_ids)
return super(TestPortsV2, self).test_list_ports_for_network_owner()
def test_mac_duplication(self):
# create 2 networks
@ -2089,32 +1887,9 @@ class TestSubnetsV2(NsxVPluginV2TestCase,
kwargs.update({'override': overrides})
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
@with_no_dhcp_subnet
def test_create_subnet_nonzero_cidr(self):
awkward_cidrs = [{'nonezero': '10.129.122.5/8',
'corrected': '10.0.0.0/8'},
{'nonezero': '11.129.122.5/15',
'corrected': '11.128.0.0/15'},
{'nonezero': '12.129.122.5/16',
'corrected': '12.129.0.0/16'},
{'nonezero': '13.129.122.5/18',
'corrected': '13.129.64.0/18'},
{'nonezero': '14.129.122.5/22',
'corrected': '14.129.120.0/22'},
{'nonezero': '15.129.122.5/24',
'corrected': '15.129.122.0/24'},
{'nonezero': '16.129.122.5/28',
'corrected': '16.129.122.0/28'}, ]
for cidr in awkward_cidrs:
with self.subnet(enable_dhcp=False,
cidr=cidr['nonezero']) as subnet:
# the API should accept and correct these cidrs for users
self.assertEqual(cidr['corrected'],
subnet['subnet']['cidr'])
with self.subnet(enable_dhcp=False, cidr='17.129.122.5/32',
gateway_ip=None) as subnet:
self.assertEqual('17.129.122.5/32', subnet['subnet']['cidr'])
return super(TestSubnetsV2, self).test_create_subnet_nonzero_cidr()
def test_create_subnet_ipv6_attributes(self):
# Expected to fail for now as we don't support IPv6 for NSXv
@ -4027,6 +3802,12 @@ class NsxVSecurityGroupsTestCase(ext_sg.SecurityGroupDBTestCase):
ext_mgr=ext_mgr)
self.plugin = directory.get_plugin()
self.addCleanup(self.fc2.reset_all)
self.original_subnet = self.subnet
def no_dhcp_subnet(self, *args, **kwargs):
if 'enable_dhcp' in kwargs:
return self.original_subnet(*args, **kwargs)
return self.original_subnet(*args, enable_dhcp=False, **kwargs)
class NsxVTestSecurityGroup(ext_sg.TestSecurityGroups,
@ -4047,16 +3828,10 @@ class NsxVTestSecurityGroup(ext_sg.TestSecurityGroups,
plugin_instance._get_edge_id_and_az_by_rtr_id.return_value = (
False, False)
@with_no_dhcp_subnet
def test_list_ports_security_group(self):
with self.network() as n:
with self.subnet(n, enable_dhcp=False):
self._create_port(self.fmt, n['network']['id'])
req = self.new_list_request('ports')
res = req.get_response(self.api)
ports = self.deserialize(self.fmt, res)
port = ports['ports'][0]
self.assertEqual(len(port[secgrp.SECURITYGROUPS]), 1)
self._delete('ports', port['id'])
return super(NsxVTestSecurityGroup,
self).test_list_ports_security_group()
def test_vnic_security_group_membership(self):
p = directory.get_plugin()