Merge "[Tempest]: Adding of more cases for FWaaS"
This commit is contained in:
commit
a68d379eee
@ -41,3 +41,6 @@ MD_BASE_URL = "http://169.254.169.254/"
|
||||
NSX_BACKEND_TIME_INTERVAL = 30
|
||||
NSX_BACKEND_SMALL_TIME_INTERVAL = 10
|
||||
NSX_BACKEND_VERY_SMALL_TIME_INTERVAL = 5
|
||||
|
||||
# FWaaS
|
||||
NO_OF_ENTRIES = 20
|
||||
|
@ -25,6 +25,7 @@ from tempest.lib import decorators
|
||||
from tempest.lib import exceptions as lib_exc
|
||||
from tempest import test
|
||||
|
||||
from vmware_nsx_tempest.common import constants
|
||||
from vmware_nsx_tempest.services import fwaas_client as FWAASC
|
||||
from vmware_nsx_tempest.services import nsxv_client
|
||||
|
||||
@ -121,7 +122,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
"status": status,
|
||||
}
|
||||
raise lib_exc.TimeoutException(msg)
|
||||
time.sleep(1)
|
||||
time.sleep(constants.NSX_BACKEND_VERY_SMALL_TIME_INTERVAL)
|
||||
|
||||
def _wait_firewall_ready(self, firewall_id):
|
||||
self._wait_firewall_while(firewall_id,
|
||||
@ -189,6 +190,110 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
m = ("Timed out waiting for firewall %s deleted" % fw_id)
|
||||
raise lib_exc.TimeoutException(m)
|
||||
|
||||
def _check_firewall_rule_exists_at_backend(self, rules,
|
||||
firewall_rule_name):
|
||||
for rule in rules:
|
||||
if rule['name'] in firewall_rule_name:
|
||||
self.assertIn(rule['name'], firewall_rule_name)
|
||||
return True
|
||||
return False
|
||||
|
||||
def _create_firewall_rule_name(self, body):
|
||||
firewall_rule_name = body['firewall_rule']['name']
|
||||
firewall_rule_name = "Fwaas-" + firewall_rule_name
|
||||
return firewall_rule_name
|
||||
|
||||
def _create_firewall_advanced_topo(self, router_type):
|
||||
fw_rule_id_list = []
|
||||
router = self.create_router_by_type(router_type)
|
||||
self.addCleanup(self._try_delete_router, router)
|
||||
edges = self.vsm.get_all_edges()
|
||||
for key in edges:
|
||||
if router['name'] in key['name']:
|
||||
edge_id = key['id']
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
rules_before = len(rules)
|
||||
for rule_id in range(0, constants.NO_OF_ENTRIES):
|
||||
if rule_id % 2 == 0:
|
||||
action = "allow"
|
||||
protocol = "tcp"
|
||||
else:
|
||||
action = "allow"
|
||||
protocol = "udp"
|
||||
firewall_rule = self.fwaasv1_client.create_firewall_rule(
|
||||
name=data_utils.rand_name("fw-rule"),
|
||||
action=action,
|
||||
protocol=protocol)
|
||||
fw_rule_id = firewall_rule['firewall_rule']['id']
|
||||
firewall_name = self._create_firewall_rule_name(firewall_rule)
|
||||
self.addCleanup(self._try_delete_rule, fw_rule_id)
|
||||
fw_rule_id_list.append(fw_rule_id)
|
||||
# Update firewall policy
|
||||
body = self.fwaasv1_client.create_firewall_policy(
|
||||
name=data_utils.rand_name("fw-policy"))
|
||||
fw_policy_id = body['firewall_policy']['id']
|
||||
self.addCleanup(self._try_delete_policy, fw_policy_id)
|
||||
# Insert rule to firewall policy
|
||||
for fw_rule_id in fw_rule_id_list:
|
||||
self.fwaasv1_client.insert_firewall_rule_in_policy(
|
||||
fw_policy_id, fw_rule_id, '', '')
|
||||
firewall_1 = self.fwaasv1_client.create_firewall(
|
||||
name=data_utils.rand_name("firewall"),
|
||||
firewall_policy_id=fw_policy_id,
|
||||
router_ids=[router['id']])
|
||||
created_firewall = firewall_1['firewall']
|
||||
self.addCleanup(self._try_delete_firewall, created_firewall['id'])
|
||||
# Wait for the firewall resource to become ready
|
||||
self._wait_until_ready(created_firewall['id'])
|
||||
firewall_topo = dict(router=router, firewall_name=firewall_name,
|
||||
fw_policy_id=fw_policy_id,
|
||||
firewall_id=created_firewall['id'],
|
||||
rules_before=rules_before)
|
||||
return firewall_topo
|
||||
|
||||
def _create_firewall_basic_topo(self, router_type, policy=None):
|
||||
router = self.create_router_by_type(router_type)
|
||||
self.addCleanup(self._try_delete_router, router)
|
||||
body = self.fwaasv1_client.create_firewall_rule(
|
||||
name=data_utils.rand_name("fw-rule"),
|
||||
action="allow",
|
||||
protocol="tcp")
|
||||
fw_rule_id1 = body['firewall_rule']['id']
|
||||
firewall_name = self._create_firewall_rule_name(body)
|
||||
self.addCleanup(self._try_delete_rule, fw_rule_id1)
|
||||
# Create firewall policy
|
||||
if not policy:
|
||||
body = self.fwaasv1_client.create_firewall_policy(
|
||||
name=data_utils.rand_name("fw-policy"))
|
||||
fw_policy_id = body['firewall_policy']['id']
|
||||
self.addCleanup(self._try_delete_policy, fw_policy_id)
|
||||
# Insert rule to firewall policy
|
||||
self.fwaasv1_client.insert_firewall_rule_in_policy(
|
||||
fw_policy_id, fw_rule_id1, '', '')
|
||||
else:
|
||||
fw_policy_id = policy
|
||||
# Create firewall
|
||||
firewall_1 = self.fwaasv1_client.create_firewall(
|
||||
name=data_utils.rand_name("firewall"),
|
||||
firewall_policy_id=fw_policy_id,
|
||||
router_ids=[router['id']])
|
||||
created_firewall = firewall_1['firewall']
|
||||
self.addCleanup(self._try_delete_firewall, created_firewall['id'])
|
||||
# Wait for the firewall resource to become ready
|
||||
self._wait_until_ready(created_firewall['id'])
|
||||
firewall_topo = dict(router=router, firewall_name=firewall_name,
|
||||
fw_policy_id=fw_policy_id,
|
||||
fw_rule_id1=fw_rule_id1,
|
||||
firewall_id=created_firewall['id'])
|
||||
return firewall_topo
|
||||
|
||||
def _get_list_fw_rule_ids(self, fw_policy_id):
|
||||
fw_policy = self.fwaasv1_client.show_firewall_policy(
|
||||
fw_policy_id)
|
||||
return [ruleid for ruleid in fw_policy['firewall_policy']
|
||||
['firewall_rules']]
|
||||
|
||||
def create_router_by_type(self, router_type, name=None, **kwargs):
|
||||
routers_client = self.manager.routers_client
|
||||
router_name = name or data_utils.rand_name('mtz-')
|
||||
@ -435,6 +540,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
self._create_firewall_basic_topo('distributed',
|
||||
firewall_topo1['fw_policy_id'])
|
||||
edges = self.vsm.get_all_edges()
|
||||
firewall_topo2['router']['name'] += '-plr'
|
||||
for key in edges:
|
||||
if firewall_topo1['router']['name'] in key['name']:
|
||||
edge_id_excl = key['id']
|
||||
@ -442,11 +548,11 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
edge_id_dist = key['id']
|
||||
if edge_id_excl and edge_id_dist:
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id_excl)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id_excl)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_topo1['firewall_name']))
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id_dist)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id_dist)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_topo1['firewall_name']))
|
||||
@ -467,11 +573,13 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
edge_id_dist = key['id']
|
||||
if edge_id_excl and edge_id_dist:
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id_excl)
|
||||
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id_excl)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_topo1['firewall_name']))
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id_dist)
|
||||
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id_dist)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_topo2['firewall_name']))
|
||||
@ -504,65 +612,20 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
if firewall_topo['router']['name'] in key['name']:
|
||||
edge_id = key['id']
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
for rule in rules:
|
||||
if rule['name'] in ('VSERule', 'MDServiceIP', 'MDInterEdgeNet'):
|
||||
continue
|
||||
if rule_no == 1:
|
||||
self.assertIn(rule['name'], firewall_rule_name_2)
|
||||
self.assertIn(rule['name'], firewall_rule_name_2,
|
||||
"Rule exists at position 1")
|
||||
rule_no += rule_no
|
||||
continue
|
||||
if rule_no == 2:
|
||||
self.assertIn(rule['name'], firewall_topo['firewall_name'])
|
||||
self.assertIn(rule['name'], firewall_topo['firewall_name'],
|
||||
"Rule exists at position 2")
|
||||
break
|
||||
|
||||
def _check_firewall_rule_exists_at_backend(self, rules,
|
||||
firewall_rule_name):
|
||||
for rule in rules:
|
||||
if rule['name'] in firewall_rule_name:
|
||||
self.assertIn(rule['name'], firewall_rule_name)
|
||||
return True
|
||||
return False
|
||||
|
||||
def _create_firewall_rule_name(self, body):
|
||||
firewall_rule_name = body['firewall_rule']['name']
|
||||
firewall_rule_name = "Fwaas-" + firewall_rule_name
|
||||
return firewall_rule_name
|
||||
|
||||
def _create_firewall_basic_topo(self, router_type, policy=None):
|
||||
router = self.create_router_by_type(router_type)
|
||||
self.addCleanup(self._try_delete_router, router)
|
||||
body = self.fwaasv1_client.create_firewall_rule(
|
||||
name=data_utils.rand_name("fw-rule"),
|
||||
action="allow",
|
||||
protocol="tcp")
|
||||
fw_rule_id1 = body['firewall_rule']['id']
|
||||
firewall_name = self._create_firewall_rule_name(body)
|
||||
self.addCleanup(self._try_delete_rule, fw_rule_id1)
|
||||
# Create firewall policy
|
||||
if not policy:
|
||||
body = self.fwaasv1_client.create_firewall_policy(
|
||||
name=data_utils.rand_name("fw-policy"))
|
||||
fw_policy_id = body['firewall_policy']['id']
|
||||
self.addCleanup(self._try_delete_policy, fw_policy_id)
|
||||
# Insert rule to firewall policy
|
||||
self.fwaasv1_client.insert_firewall_rule_in_policy(
|
||||
fw_policy_id, fw_rule_id1, '', '')
|
||||
else:
|
||||
fw_policy_id = policy
|
||||
# Create firewall
|
||||
firewall_1 = self.fwaasv1_client.create_firewall(
|
||||
name=data_utils.rand_name("firewall"),
|
||||
firewall_policy_id=fw_policy_id,
|
||||
router_ids=[router['id']])
|
||||
created_firewall = firewall_1['firewall']
|
||||
self.addCleanup(self._try_delete_firewall, created_firewall['id'])
|
||||
# Wait for the firewall resource to become ready
|
||||
self._wait_until_ready(created_firewall['id'])
|
||||
firewall_topo = dict(router=router, firewall_name=firewall_name,
|
||||
fw_policy_id=fw_policy_id,
|
||||
fw_rule_id1=fw_rule_id1,
|
||||
firewall_id=created_firewall['id'])
|
||||
return firewall_topo
|
||||
|
||||
@test.attr(type='nsxv')
|
||||
@decorators.idempotent_id('da65de07-a60f-404d-ad1d-2d2c71a3b6a5')
|
||||
def test_firewall_add_delete_between_routers(self):
|
||||
@ -578,7 +641,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
if router['name'] in key['name']:
|
||||
edge_id = key['id']
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules,
|
||||
@ -587,7 +650,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
firewall_topo['firewall_id'],
|
||||
router_ids=[router['id'], firewall_topo['router']['id']])
|
||||
self._wait_firewall_ready(firewall_topo['firewall_id'])
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_topo['firewall_name']))
|
||||
@ -596,7 +659,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
if firewall_topo['router']['name'] in key['name']:
|
||||
edge_id = key['id']
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_topo['firewall_name']))
|
||||
@ -611,7 +674,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
if firewall_topo['router']['name'] in key['name']:
|
||||
edge_id = key['id']
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_topo['firewall_name']))
|
||||
@ -627,7 +690,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
firewall_topo['fw_policy_id'], fw_rule_id2,
|
||||
firewall_topo['fw_rule_id1'], '')
|
||||
self._wait_firewall_ready(firewall_topo['firewall_id'])
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_name_2))
|
||||
@ -645,7 +708,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
self.addCleanup(self._try_delete_policy, firewall_topo['fw_policy_id'])
|
||||
self.addCleanup(self._try_delete_firewall,
|
||||
firewall_topo['firewall_id'])
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_name_3))
|
||||
@ -660,15 +723,15 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
if firewall_topo['router']['name'] in key['name']:
|
||||
edge_id = key['id']
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_topo['firewall_name']))
|
||||
self.fwaasv1_client.remove_firewall_rule_from_policy(
|
||||
firewall_topo['fw_policy_id'], firewall_topo['fw_rule_id1'])
|
||||
self.delete_firewall_and_wait(firewall_topo['firewall_id'])
|
||||
time.sleep(60)
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id)
|
||||
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
False, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_topo['firewall_name']))
|
||||
@ -683,7 +746,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
if firewall_topo['router']['name'] in key['name']:
|
||||
edge_id = key['id']
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_info(edge_id)
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, firewall_topo['firewall_name']))
|
||||
@ -771,11 +834,59 @@ class FWaaSTestJSON(base.BaseNetworkTest):
|
||||
# Verify removal of rule from firewall policy
|
||||
self.assertNotIn(fw_rule_id1, self._get_list_fw_rule_ids(fw_policy_id))
|
||||
|
||||
def _get_list_fw_rule_ids(self, fw_policy_id):
|
||||
fw_policy = self.fwaasv1_client.show_firewall_policy(
|
||||
fw_policy_id)
|
||||
return [ruleid for ruleid in fw_policy['firewall_policy']
|
||||
['firewall_rules']]
|
||||
@test.attr(type='nsxv')
|
||||
@decorators.idempotent_id('901dae30-b148-43d9-ac86-09777aeaba20')
|
||||
def test_update_firewall_name_at_backend_excl_edge(self):
|
||||
firewall_topo = self._create_firewall_basic_topo('exclusive')
|
||||
fw_rule_id = firewall_topo['fw_rule_id1']
|
||||
body = self.fwaasv1_client.update_firewall_rule(fw_rule_id,
|
||||
name="updated_rule")
|
||||
updated_fw_rule = body["firewall_rule"]
|
||||
self.assertEqual("updated_rule", updated_fw_rule['name'])
|
||||
edges = self.vsm.get_all_edges()
|
||||
for key in edges:
|
||||
if firewall_topo['router']['name'] in key['name']:
|
||||
edge_id = key['id']
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, "Fwaas-updated_rule"))
|
||||
|
||||
@test.attr(type='nsxv')
|
||||
@decorators.idempotent_id('471ebc13-8e3b-4aca-85b8-747935bf0559')
|
||||
def test_update_firewall_name_at_backend_dist_edge(self):
|
||||
firewall_topo = self._create_firewall_basic_topo('distributed')
|
||||
fw_rule_id = firewall_topo['fw_rule_id1']
|
||||
body = self.fwaasv1_client.update_firewall_rule(fw_rule_id,
|
||||
name="updated_rule")
|
||||
updated_fw_rule = body["firewall_rule"]
|
||||
self.assertEqual("updated_rule", updated_fw_rule['name'])
|
||||
edges = self.vsm.get_all_edges()
|
||||
firewall_topo['router']['name'] += '-plr'
|
||||
for key in edges:
|
||||
if firewall_topo['router']['name'] in key['name']:
|
||||
edge_id = key['id']
|
||||
break
|
||||
rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
self.assertEqual(
|
||||
True, self._check_firewall_rule_exists_at_backend(
|
||||
rules, "Fwaas-updated_rule"))
|
||||
|
||||
@test.attr(type='nsxv')
|
||||
@decorators.idempotent_id('0bdc9670-17b8-4dd5-80c8-dc6e956fc6ef')
|
||||
def test_create_multiple_firewall_rules_check_at_backend(self):
|
||||
firewall_topo = self._create_firewall_advanced_topo('exclusive')
|
||||
edges = self.vsm.get_all_edges()
|
||||
for key in edges:
|
||||
if firewall_topo['router']['name'] in key['name']:
|
||||
edge_id = key['id']
|
||||
break
|
||||
firewall_rules = self.vsm.get_edge_firewall_rules(edge_id)
|
||||
total_rules = firewall_topo['rules_before'] + len(firewall_rules)
|
||||
self.assertGreaterEqual(total_rules, constants.NO_OF_ENTRIES,
|
||||
"Firewall Rules are greater than %s" %
|
||||
constants.NO_OF_ENTRIES)
|
||||
|
||||
@test.attr(type='nsxv')
|
||||
@decorators.idempotent_id('0249db39-6284-456a-9449-2adacdca4d3b')
|
||||
|
Loading…
Reference in New Issue
Block a user