Tempest: Fixed failed few tempest scenario test cases

- Only admin can confiugre SNAT on router. It was failing when
tenant configured SNAT and that failure was expected.
- Also added one test case to test this SNATs behavior.
- nsx_firewall_realization takes variable time based on the tests
we execute in parallel. So removed hard coded sleep and put a
adaptive sleep loop with timeout.
- Added adaptive sleep in provider security test case.
- Added parallel execution compatibility on provider security tests
- Fixed one of the provider sec test

vmware_nsx_tempest.tests.nsxv3.scenario.test_router_nonat_ops.TestRouterNoNATOps.test_router_nonat_to_nat_ops[id-a0274738-d3e7-49db-bf10-a5563610940d,nsxv3]       82.734
vmware_nsx_tempest.tests.nsxv3.scenario.test_router_nonat_ops.TestRouterNoNATOps.test_router_nat_to_nonat_ops[id-5e5bfdd4-0962-47d3-a89b-7ce64322b53e,nsxv3]       80.324
vmware_nsx_tempest.tests.nsxv3.scenario.test_router_nonat_ops.TestRouterNoNATOps.test_only_admin_canconfigure_snat[id-971e8e8b-3cf2-47a9-ac24-5b19f586731c,nsxv3]  16.793
vmware_nsx_tempest.tests.nsxv3.api.test_nsx_security_groups.NSXv3SecGroupTest.test_create_update_nsx_security_group[id-904ca2c1-a14d-448b-b723-a7366e613bf1,nsxv3]  9.986
vmware_nsx_tempest.tests.nsxv3.api.test_nsx_security_groups.NSXv3SecGroupTest.test_create_nsx_security_group_rule[id-91c298c0-fbbd-4597-b4c6-1a7ecfb8a2de,nsxv3]    9.102
vmware_nsx_tempest.tests.nsxv3.api.test_nsx_security_groups.NSXv3SecGroupTest.test_delete_nsx_security_group[id-e637cc59-c5e6-49b5-a539-e517e780656e,nsxv3]         7.607
vmware_nsx_tempest.tests.nsxv3.api.test_nsx_security_groups.NSXv3SecGroupTest.test_delete_nsx_security_group_rule[id-b6c424e5-3553-4b7d-bd95-8b1f0a860fb4,nsxv3]    6.978
vmware_nsx_tempest.tests.nsxv3.api.test_provider_sec_group.ProviderSecurityGroupTest.test_check_security_group_precedence_at_beckend [4.889487s] ... ok
vmware_nsx_tempest.tests.nsxv3.api.test_provider_sec_group.ProviderSecurityGroupTest.test_multiple_provider_security_group [3.290622s] ... ok
vmware_nsx_tempest.tests.nsxv3.api.test_provider_sec_group.ProviderSecurityGroupTest.test_provider_security_group_rule_at_beckend [4.901785s] ... ok

Change-Id: I90eac04c26808cc825d8b12773b73599d7a1c463
This commit is contained in:
Devang Doshi 2017-05-30 14:50:40 -07:00
parent 3c76f255e5
commit 050d456704
4 changed files with 129 additions and 74 deletions

View File

@ -17,6 +17,9 @@ from oslo_log import log
LOG = log.getLogger(__name__)
# General constants.
ONE_SEC = 1
# L2GW constants.
L2GW = "l2_gateway"
L2GWS = L2GW + "s"
@ -42,5 +45,8 @@ NSX_BACKEND_TIME_INTERVAL = 30
NSX_BACKEND_SMALL_TIME_INTERVAL = 10
NSX_BACKEND_VERY_SMALL_TIME_INTERVAL = 5
# DFW
NSX_FIREWALL_REALIZED_TIMEOUT = 120
# FWaaS
NO_OF_ENTRIES = 20

View File

@ -24,14 +24,13 @@ from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest import test
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.services import nsxv3_client
LOG = logging.getLogger(__name__)
CONF = config.CONF
NSX_FIREWALL_REALIZED_DELAY = 20
class NSXv3SecGroupTest(base.BaseSecGroupTest):
_project_network_cidr = CONF.network.project_network_cidr
@ -85,44 +84,49 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
"rule does not match with %s." %
(key, value))
def _wait_till_firewall_gets_realize(self, secgroup, dfw_error_msg=""):
nsx_firewall_time_counter = 0
nsx_dfw_section = None
# wait till timeout or till dfw section
while nsx_firewall_time_counter < \
constants.NSX_FIREWALL_REALIZED_TIMEOUT and \
not nsx_dfw_section:
nsx_firewall_time_counter += 1
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'],
secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(secgroup['name'],
secgroup['id'])
time.sleep(constants.ONE_SEC)
self.assertIsNotNone(nsx_nsgroup)
self.assertIsNotNone(nsx_dfw_section, dfw_error_msg)
return nsx_nsgroup, nsx_dfw_section
@test.attr(type='nsxv3')
@decorators.idempotent_id('904ca2c1-a14d-448b-b723-a7366e613bf1')
def test_create_update_nsx_security_group(self):
# Create a security group
group_create_body, name = self._create_security_group()
secgroup = group_create_body['security_group']
time.sleep(NSX_FIREWALL_REALIZED_DELAY)
LOG.info("Create security group with name %(name)s and id %(id)s",
{'name': secgroup['name'], 'id': secgroup['id']})
dfw_error_msg = "Firewall section not found for %s!" % secgroup['name']
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
# List security groups and verify if created group is there in response
list_body = self.security_groups_client.list_security_groups()
secgroup_list = list()
for sg in list_body['security_groups']:
secgroup_list.append(sg['id'])
self.assertIn(secgroup['id'], secgroup_list)
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'], secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(secgroup['name'],
secgroup['id'])
self.assertIsNotNone(nsx_nsgroup)
self.assertIsNotNone(nsx_dfw_section)
# Update the security group
new_name = data_utils.rand_name('security-')
new_description = data_utils.rand_name('security-description')
update_body = self.security_groups_client.update_security_group(
secgroup['id'],
name=new_name,
description=new_description)
secgroup['id'], name=new_name, description=new_description)
# Verify if security group is updated
updated_secgroup = update_body['security_group']
self.assertEqual(updated_secgroup['name'], new_name)
self.assertEqual(updated_secgroup['description'], new_description)
nsx_nsgroup = self.nsx.get_ns_group(updated_secgroup['name'],
updated_secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(
new_name, secgroup['id'])
self.assertIsNotNone(nsx_nsgroup)
self.assertIsNotNone(nsx_dfw_section,
"Firewall section %s is not updated!")
dfw_error_msg = "Firewall section is not updated for %s!" % \
updated_secgroup['name']
self._wait_till_firewall_gets_realize(updated_secgroup, dfw_error_msg)
@test.attr(type='nsxv3')
@decorators.idempotent_id('e637cc59-c5e6-49b5-a539-e517e780656e')
@ -132,15 +136,13 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
create_body = self.security_groups_client.create_security_group(
name=name)
secgroup = create_body['security_group']
time.sleep(NSX_FIREWALL_REALIZED_DELAY)
nsx_nsgroup = self.nsx.get_ns_group(name, secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(name, secgroup['id'])
dfw_error_msg = "Firewall section not found for %s!" % secgroup['name']
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
self.assertEqual(secgroup['name'], name)
self.assertIsNotNone(nsx_nsgroup)
self.assertIsNotNone(nsx_dfw_section)
# Delete the security group
self._delete_security_group(secgroup['id'])
nsx_nsgroup = self.nsx.get_ns_group(name, secgroup['id'])
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'],
secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(name, secgroup['id'])
self.assertIsNone(nsx_nsgroup)
self.assertIsNone(nsx_dfw_section)
@ -150,12 +152,10 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
def test_create_nsx_security_group_rule(self):
# Create a security group
create_body, _ = self._create_security_group()
time.sleep(NSX_FIREWALL_REALIZED_DELAY)
secgroup = create_body['security_group']
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'], secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(secgroup['name'],
secgroup['id'])
self.assertIsNotNone(nsx_dfw_section)
dfw_error_msg = "Firewall section not found for %s!" % secgroup['name']
nsx_nsgroup, nsx_dfw_section = self._wait_till_firewall_gets_realize(
secgroup, dfw_error_msg)
# Create rules for each protocol
protocols = ['tcp', 'udp', 'icmp']
client = self.security_group_rules_client
@ -210,13 +210,10 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
def test_delete_nsx_security_group_rule(self):
# Create a security group
create_body, _ = self._create_security_group()
time.sleep(NSX_FIREWALL_REALIZED_DELAY)
secgroup = create_body['security_group']
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'], secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(secgroup['name'],
secgroup['id'])
self.assertIsNotNone(nsx_nsgroup)
self.assertIsNotNone(nsx_dfw_section)
dfw_error_msg = "Firewall section not found for %s!" % secgroup['name']
nsx_nsgroup, nsx_dfw_section = self._wait_till_firewall_gets_realize(
secgroup, dfw_error_msg)
# Create a security group rule
client = self.security_group_rules_client
rule_create_body = client.create_security_group_rule(

View File

@ -26,7 +26,7 @@ from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.services import nsxv3_client
CONF = config.CONF
PROVIDER_SECURITY_GRP = 'provider-sec-group'
LOG = constants.log.getLogger(__name__)
@ -70,7 +70,7 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
project_id=None, provider=False):
cmgr = cmgr or self.cmgr_adm
sg_client = cmgr.security_groups_client
sg_dict = dict(name=data_utils.rand_name('provider-sec-group'))
sg_dict = dict(name=data_utils.rand_name(PROVIDER_SECURITY_GRP))
if project_id:
sg_dict['tenant_id'] = project_id
if provider:
@ -108,6 +108,23 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
sg = sg_client.show_security_group(security_group_id)
return sg.get('security_group', sg)
def _wait_till_firewall_gets_realize(self, secgroup, dfw_error_msg=""):
nsx_firewall_time_counter = 0
nsx_dfw_section = None
# wait till timeout or till dfw section
while nsx_firewall_time_counter < \
constants.NSX_FIREWALL_REALIZED_TIMEOUT and \
not nsx_dfw_section:
nsx_firewall_time_counter += 1
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'],
secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(secgroup['name'],
secgroup['id'])
time.sleep(constants.ONE_SEC)
self.assertIsNotNone(nsx_nsgroup)
self.assertIsNotNone(nsx_dfw_section, dfw_error_msg)
return nsx_nsgroup, nsx_dfw_section
@test.attr(type='nsxv3')
@decorators.idempotent_id('4fc39f02-4fb1-4e5c-bf64-b98dd7f514f7')
def test_provider_security_group_at_beckend(self):
@ -180,7 +197,8 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
sg_rule = self.create_security_group_rule(sg_id, cmgr=self.cmgr_adm,
protocol='icmp')
sg_rule.get('id')
time.sleep(5)
dfw_error_msg = "Firewall section not found for %s!" % sg_name
self._wait_till_firewall_gets_realize(sg, dfw_error_msg)
firewall_section = self.nsx.get_firewall_section(sg_name, sg_id)
output = self.nsx.get_firewall_section_rules(firewall_section)
self.assertEqual('DROP', output[0]['action'])
@ -220,13 +238,11 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
def test_multiple_provider_security_group(self):
sg = self.create_security_provider_group(self.cmgr_adm, provider=True)
sg_id = sg.get('id')
sg.get('name')
sg_rule = self.create_security_group_rule(sg_id, cmgr=self.cmgr_adm,
self.create_security_group_rule(sg_id, cmgr=self.cmgr_adm,
protocol='icmp')
sg_rule.get('id')
sg1 = self.create_security_provider_group(self.cmgr_adm, provider=True)
sg2 = self.create_security_provider_group(self.cmgr_adm, provider=True)
self.assertNotEqual(sg1.get('id'), sg2.get('id'))
self.assertRaises(exceptions.BadRequest,
self.create_security_provider_group,
self.cmgr_adm, provider=True)
@test.attr(type='nsxv3')
@decorators.idempotent_id('275abe9f-4f01-46e5-bde0-0b6840290d3b')
@ -280,28 +296,40 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
@test.attr(type='nsxv3')
@decorators.idempotent_id('dfc6bb8e-ba7b-4ce5-b6ee-0d0830d7e152')
def test_check_security_group_precedence_at_beckend(self):
count = 0
project_id = self.cmgr_adm.networks_client.tenant_id
provider_sg = \
self.create_security_provider_group(self.cmgr_adm,
project_id=project_id,
provider=True)
provider_sg_name = provider_sg.get('name')
default_sg = \
self.create_security_provider_group(self.cmgr_adm,
project_id=project_id,
provider=False)
sg_name = default_sg.get('name')
firewall_section = self.nsx.get_firewall_sections()
for sec_name in firewall_section:
if (provider_sg_name in sec_name['display_name'] and
sg_name not in sec_name['display_name']):
if count == 0:
LOG.info("Provider group has high priority over "
"default sec group")
# Wait till provider sec gets realize in NSX.
nsx_firewall_time_counter = 0
provider_sec = False
while nsx_firewall_time_counter < \
constants.NSX_FIREWALL_REALIZED_TIMEOUT and not provider_sec:
nsx_firewall_time_counter += 1
firewall_sections = self.nsx.get_firewall_sections()
for section in firewall_sections:
if provider_sg_name in section['display_name']:
provider_sec = True
break
count += count
self.assertIn(provider_sg_name, sec_name['display_name'])
time.sleep(constants.ONE_SEC)
for section in firewall_sections:
# when execute tempest in parallel fashion,
# we create provider security group for other tests,
# NSX will return all provider security group from DFW.
if PROVIDER_SECURITY_GRP in section['display_name'] and \
provider_sg_name not in section['display_name']:
pass
else:
# check the sec name
break
msg = "Provider group does not have highest priority " \
"over default security group"
self.assertIn(provider_sg_name, section['display_name'], msg)
@test.attr(type='nsxv3')
@decorators.idempotent_id('37d8fbfc-eb3f-40c8-a146-70f5df937a2e')

View File

@ -21,6 +21,7 @@ from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest.scenario import manager
from tempest import test
@ -75,6 +76,7 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
self.keypairs = {}
self.servers = []
self.config_drive = CONF.compute_feature_enabled.config_drive
self.cmgr_adm = self.get_client_manager('admin')
def _setup_network_topo(self, enable_snat=None):
self.security_group = self._create_security_group()
@ -83,7 +85,8 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
self.router = self._create_router(
router_name=data_utils.rand_name('router-smoke'),
external_network_id=CONF.network.public_network_id,
enable_snat=enable_snat)
enable_snat=enable_snat,
routers_client=self.cmgr_adm.routers_client)
self.routers_client.add_router_interface(
self.router['id'], subnet_id=self.subnet['id'])
self.addCleanup(self.routers_client.remove_router_interface,
@ -107,18 +110,27 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
subnet_id=i['fixed_ips'][0]['subnet_id'])
self.routers_client.delete_router(router['id'])
def _update_router(self, router_id, router_client, ext_gw_info):
router_client.update_router(
router_id=router_id, external_gateway_info=ext_gw_info)
def _create_router(self, router_name=None, admin_state_up=True,
external_network_id=None, enable_snat=None,
routers_client=None,
**kwargs):
ext_gw_info = {}
if external_network_id:
ext_gw_info['network_id'] = external_network_id
if enable_snat is not None:
ext_gw_info['enable_snat'] = enable_snat
if not routers_client:
routers_client = self.routers_client
body = self.routers_client.create_router(
name=router_name, external_gateway_info=ext_gw_info,
name=router_name,
admin_state_up=admin_state_up, **kwargs)
router = body['router']
# Only admin can configure SNAT parameteters
self._update_router(router['id'], routers_client, ext_gw_info)
self.addCleanup(self._cleanup_router, router)
return router
@ -246,11 +258,11 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
self._check_network_vm_connectivity(network=self.network)
self._check_nonat_network_connectivity(should_connect=False)
# Update router to disable snat and disassociate floating ip
self.routers_client.update_router(
self.router['id'],
external_gateway_info = {
'network_id': CONF.network.public_network_id,
'enable_snat': (not snat)})
'enable_snat': (not snat)}
self._update_router(self.router['id'], self.cmgr_adm.routers_client,
external_gateway_info)
floating_ip, server = self.floating_ip_tuple
self._disassociate_floating_ip(floating_ip)
nsx_router = self.nsx.get_logical_router(
@ -285,11 +297,11 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
self.assertTrue(router_adv['advertise_nsx_connected_routes'], adv_msg)
self._check_nonat_network_connectivity()
# Update router to Enable snat and associate floating ip
self.routers_client.update_router(
self.router['id'],
external_gateway_info = {
'network_id': CONF.network.public_network_id,
'enable_snat': (not snat)})
'enable_snat': (not snat)}
self._update_router(self.router['id'], self.cmgr_adm.routers_client,
external_gateway_info)
floating_ip = self.create_floating_ip(self.server)
self.floating_ip_tuple = Floating_IP_tuple(floating_ip, self.server)
nsx_router = self.nsx.get_logical_router(
@ -317,3 +329,15 @@ class TestRouterNoNATOps(manager.NetworkScenarioTest):
def test_router_nonat_to_nat_ops(self):
"""Test update router from NoNAT to NATed scenario"""
self._test_router_nat_update_when_no_snat()
@test.attr(type='nsxv3')
@decorators.idempotent_id('971e8e8b-3cf2-47a9-ac24-5b19f586731c')
def test_only_admin_can_configure_snat(self):
"""Only admin can configure the SNAT"""
self.security_group = self._create_security_group()
self.network = self._create_network()
self.subnet = self._create_subnet(self.network)
self.assertRaises(exceptions.Forbidden, self._create_router,
router_name=data_utils.rand_name('router-smoke'),
external_network_id=CONF.network.public_network_id,
enable_snat=False)