[Tempest]: Adding of more cases for FWaaS

Added update cases of firewall rule
Added multiple addtion of firewall rule to policy

Change-Id: I973a0a36856fc9ddf4dc01fcf4d0134d395e5186
This commit is contained in:
Puneet Arora 2017-05-16 20:13:26 +00:00
parent 8f5260689e
commit 45974448ed
2 changed files with 186 additions and 72 deletions

View File

@ -41,3 +41,6 @@ MD_BASE_URL = "http://169.254.169.254/"
NSX_BACKEND_TIME_INTERVAL = 30 NSX_BACKEND_TIME_INTERVAL = 30
NSX_BACKEND_SMALL_TIME_INTERVAL = 10 NSX_BACKEND_SMALL_TIME_INTERVAL = 10
NSX_BACKEND_VERY_SMALL_TIME_INTERVAL = 5 NSX_BACKEND_VERY_SMALL_TIME_INTERVAL = 5
# FWaaS
NO_OF_ENTRIES = 20

View File

@ -25,6 +25,7 @@ from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc from tempest.lib import exceptions as lib_exc
from tempest import test from tempest import test
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.services import fwaas_client as FWAASC from vmware_nsx_tempest.services import fwaas_client as FWAASC
from vmware_nsx_tempest.services import nsxv_client from vmware_nsx_tempest.services import nsxv_client
@ -121,7 +122,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
"status": status, "status": status,
} }
raise lib_exc.TimeoutException(msg) raise lib_exc.TimeoutException(msg)
time.sleep(1) time.sleep(constants.NSX_BACKEND_VERY_SMALL_TIME_INTERVAL)
def _wait_firewall_ready(self, firewall_id): def _wait_firewall_ready(self, firewall_id):
self._wait_firewall_while(firewall_id, self._wait_firewall_while(firewall_id,
@ -189,6 +190,110 @@ class FWaaSTestJSON(base.BaseNetworkTest):
m = ("Timed out waiting for firewall %s deleted" % fw_id) m = ("Timed out waiting for firewall %s deleted" % fw_id)
raise lib_exc.TimeoutException(m) raise lib_exc.TimeoutException(m)
def _check_firewall_rule_exists_at_backend(self, rules,
firewall_rule_name):
for rule in rules:
if rule['name'] in firewall_rule_name:
self.assertIn(rule['name'], firewall_rule_name)
return True
return False
def _create_firewall_rule_name(self, body):
firewall_rule_name = body['firewall_rule']['name']
firewall_rule_name = "Fwaas-" + firewall_rule_name
return firewall_rule_name
def _create_firewall_advanced_topo(self, router_type):
fw_rule_id_list = []
router = self.create_router_by_type(router_type)
self.addCleanup(self._try_delete_router, router)
edges = self.vsm.get_all_edges()
for key in edges:
if router['name'] in key['name']:
edge_id = key['id']
break
rules = self.vsm.get_edge_firewall_rules(edge_id)
rules_before = len(rules)
for rule_id in range(0, constants.NO_OF_ENTRIES):
if rule_id % 2 == 0:
action = "allow"
protocol = "tcp"
else:
action = "allow"
protocol = "udp"
firewall_rule = self.fwaasv1_client.create_firewall_rule(
name=data_utils.rand_name("fw-rule"),
action=action,
protocol=protocol)
fw_rule_id = firewall_rule['firewall_rule']['id']
firewall_name = self._create_firewall_rule_name(firewall_rule)
self.addCleanup(self._try_delete_rule, fw_rule_id)
fw_rule_id_list.append(fw_rule_id)
# Update firewall policy
body = self.fwaasv1_client.create_firewall_policy(
name=data_utils.rand_name("fw-policy"))
fw_policy_id = body['firewall_policy']['id']
self.addCleanup(self._try_delete_policy, fw_policy_id)
# Insert rule to firewall policy
for fw_rule_id in fw_rule_id_list:
self.fwaasv1_client.insert_firewall_rule_in_policy(
fw_policy_id, fw_rule_id, '', '')
firewall_1 = self.fwaasv1_client.create_firewall(
name=data_utils.rand_name("firewall"),
firewall_policy_id=fw_policy_id,
router_ids=[router['id']])
created_firewall = firewall_1['firewall']
self.addCleanup(self._try_delete_firewall, created_firewall['id'])
# Wait for the firewall resource to become ready
self._wait_until_ready(created_firewall['id'])
firewall_topo = dict(router=router, firewall_name=firewall_name,
fw_policy_id=fw_policy_id,
firewall_id=created_firewall['id'],
rules_before=rules_before)
return firewall_topo
def _create_firewall_basic_topo(self, router_type, policy=None):
router = self.create_router_by_type(router_type)
self.addCleanup(self._try_delete_router, router)
body = self.fwaasv1_client.create_firewall_rule(
name=data_utils.rand_name("fw-rule"),
action="allow",
protocol="tcp")
fw_rule_id1 = body['firewall_rule']['id']
firewall_name = self._create_firewall_rule_name(body)
self.addCleanup(self._try_delete_rule, fw_rule_id1)
# Create firewall policy
if not policy:
body = self.fwaasv1_client.create_firewall_policy(
name=data_utils.rand_name("fw-policy"))
fw_policy_id = body['firewall_policy']['id']
self.addCleanup(self._try_delete_policy, fw_policy_id)
# Insert rule to firewall policy
self.fwaasv1_client.insert_firewall_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
else:
fw_policy_id = policy
# Create firewall
firewall_1 = self.fwaasv1_client.create_firewall(
name=data_utils.rand_name("firewall"),
firewall_policy_id=fw_policy_id,
router_ids=[router['id']])
created_firewall = firewall_1['firewall']
self.addCleanup(self._try_delete_firewall, created_firewall['id'])
# Wait for the firewall resource to become ready
self._wait_until_ready(created_firewall['id'])
firewall_topo = dict(router=router, firewall_name=firewall_name,
fw_policy_id=fw_policy_id,
fw_rule_id1=fw_rule_id1,
firewall_id=created_firewall['id'])
return firewall_topo
def _get_list_fw_rule_ids(self, fw_policy_id):
fw_policy = self.fwaasv1_client.show_firewall_policy(
fw_policy_id)
return [ruleid for ruleid in fw_policy['firewall_policy']
['firewall_rules']]
def create_router_by_type(self, router_type, name=None, **kwargs): def create_router_by_type(self, router_type, name=None, **kwargs):
routers_client = self.manager.routers_client routers_client = self.manager.routers_client
router_name = name or data_utils.rand_name('mtz-') router_name = name or data_utils.rand_name('mtz-')
@ -435,6 +540,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
self._create_firewall_basic_topo('distributed', self._create_firewall_basic_topo('distributed',
firewall_topo1['fw_policy_id']) firewall_topo1['fw_policy_id'])
edges = self.vsm.get_all_edges() edges = self.vsm.get_all_edges()
firewall_topo2['router']['name'] += '-plr'
for key in edges: for key in edges:
if firewall_topo1['router']['name'] in key['name']: if firewall_topo1['router']['name'] in key['name']:
edge_id_excl = key['id'] edge_id_excl = key['id']
@ -442,11 +548,11 @@ class FWaaSTestJSON(base.BaseNetworkTest):
edge_id_dist = key['id'] edge_id_dist = key['id']
if edge_id_excl and edge_id_dist: if edge_id_excl and edge_id_dist:
break break
rules = self.vsm.get_edge_firewall_info(edge_id_excl) rules = self.vsm.get_edge_firewall_rules(edge_id_excl)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_topo1['firewall_name'])) rules, firewall_topo1['firewall_name']))
rules = self.vsm.get_edge_firewall_info(edge_id_dist) rules = self.vsm.get_edge_firewall_rules(edge_id_dist)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_topo1['firewall_name'])) rules, firewall_topo1['firewall_name']))
@ -467,11 +573,13 @@ class FWaaSTestJSON(base.BaseNetworkTest):
edge_id_dist = key['id'] edge_id_dist = key['id']
if edge_id_excl and edge_id_dist: if edge_id_excl and edge_id_dist:
break break
rules = self.vsm.get_edge_firewall_info(edge_id_excl) time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
rules = self.vsm.get_edge_firewall_rules(edge_id_excl)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_topo1['firewall_name'])) rules, firewall_topo1['firewall_name']))
rules = self.vsm.get_edge_firewall_info(edge_id_dist) time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
rules = self.vsm.get_edge_firewall_rules(edge_id_dist)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_topo2['firewall_name'])) rules, firewall_topo2['firewall_name']))
@ -504,65 +612,20 @@ class FWaaSTestJSON(base.BaseNetworkTest):
if firewall_topo['router']['name'] in key['name']: if firewall_topo['router']['name'] in key['name']:
edge_id = key['id'] edge_id = key['id']
break break
rules = self.vsm.get_edge_firewall_info(edge_id) rules = self.vsm.get_edge_firewall_rules(edge_id)
for rule in rules: for rule in rules:
if rule['name'] in ('VSERule', 'MDServiceIP', 'MDInterEdgeNet'):
continue
if rule_no == 1: if rule_no == 1:
self.assertIn(rule['name'], firewall_rule_name_2) self.assertIn(rule['name'], firewall_rule_name_2,
"Rule exists at position 1")
rule_no += rule_no rule_no += rule_no
continue continue
if rule_no == 2: if rule_no == 2:
self.assertIn(rule['name'], firewall_topo['firewall_name']) self.assertIn(rule['name'], firewall_topo['firewall_name'],
"Rule exists at position 2")
break break
def _check_firewall_rule_exists_at_backend(self, rules,
firewall_rule_name):
for rule in rules:
if rule['name'] in firewall_rule_name:
self.assertIn(rule['name'], firewall_rule_name)
return True
return False
def _create_firewall_rule_name(self, body):
firewall_rule_name = body['firewall_rule']['name']
firewall_rule_name = "Fwaas-" + firewall_rule_name
return firewall_rule_name
def _create_firewall_basic_topo(self, router_type, policy=None):
router = self.create_router_by_type(router_type)
self.addCleanup(self._try_delete_router, router)
body = self.fwaasv1_client.create_firewall_rule(
name=data_utils.rand_name("fw-rule"),
action="allow",
protocol="tcp")
fw_rule_id1 = body['firewall_rule']['id']
firewall_name = self._create_firewall_rule_name(body)
self.addCleanup(self._try_delete_rule, fw_rule_id1)
# Create firewall policy
if not policy:
body = self.fwaasv1_client.create_firewall_policy(
name=data_utils.rand_name("fw-policy"))
fw_policy_id = body['firewall_policy']['id']
self.addCleanup(self._try_delete_policy, fw_policy_id)
# Insert rule to firewall policy
self.fwaasv1_client.insert_firewall_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
else:
fw_policy_id = policy
# Create firewall
firewall_1 = self.fwaasv1_client.create_firewall(
name=data_utils.rand_name("firewall"),
firewall_policy_id=fw_policy_id,
router_ids=[router['id']])
created_firewall = firewall_1['firewall']
self.addCleanup(self._try_delete_firewall, created_firewall['id'])
# Wait for the firewall resource to become ready
self._wait_until_ready(created_firewall['id'])
firewall_topo = dict(router=router, firewall_name=firewall_name,
fw_policy_id=fw_policy_id,
fw_rule_id1=fw_rule_id1,
firewall_id=created_firewall['id'])
return firewall_topo
@test.attr(type='nsxv') @test.attr(type='nsxv')
@decorators.idempotent_id('da65de07-a60f-404d-ad1d-2d2c71a3b6a5') @decorators.idempotent_id('da65de07-a60f-404d-ad1d-2d2c71a3b6a5')
def test_firewall_add_delete_between_routers(self): def test_firewall_add_delete_between_routers(self):
@ -578,7 +641,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
if router['name'] in key['name']: if router['name'] in key['name']:
edge_id = key['id'] edge_id = key['id']
break break
rules = self.vsm.get_edge_firewall_info(edge_id) rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, rules,
@ -587,7 +650,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
firewall_topo['firewall_id'], firewall_topo['firewall_id'],
router_ids=[router['id'], firewall_topo['router']['id']]) router_ids=[router['id'], firewall_topo['router']['id']])
self._wait_firewall_ready(firewall_topo['firewall_id']) self._wait_firewall_ready(firewall_topo['firewall_id'])
rules = self.vsm.get_edge_firewall_info(edge_id) rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_topo['firewall_name'])) rules, firewall_topo['firewall_name']))
@ -596,7 +659,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
if firewall_topo['router']['name'] in key['name']: if firewall_topo['router']['name'] in key['name']:
edge_id = key['id'] edge_id = key['id']
break break
rules = self.vsm.get_edge_firewall_info(edge_id) rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_topo['firewall_name'])) rules, firewall_topo['firewall_name']))
@ -611,7 +674,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
if firewall_topo['router']['name'] in key['name']: if firewall_topo['router']['name'] in key['name']:
edge_id = key['id'] edge_id = key['id']
break break
rules = self.vsm.get_edge_firewall_info(edge_id) rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_topo['firewall_name'])) rules, firewall_topo['firewall_name']))
@ -627,7 +690,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
firewall_topo['fw_policy_id'], fw_rule_id2, firewall_topo['fw_policy_id'], fw_rule_id2,
firewall_topo['fw_rule_id1'], '') firewall_topo['fw_rule_id1'], '')
self._wait_firewall_ready(firewall_topo['firewall_id']) self._wait_firewall_ready(firewall_topo['firewall_id'])
rules = self.vsm.get_edge_firewall_info(edge_id) rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_name_2)) rules, firewall_name_2))
@ -645,7 +708,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
self.addCleanup(self._try_delete_policy, firewall_topo['fw_policy_id']) self.addCleanup(self._try_delete_policy, firewall_topo['fw_policy_id'])
self.addCleanup(self._try_delete_firewall, self.addCleanup(self._try_delete_firewall,
firewall_topo['firewall_id']) firewall_topo['firewall_id'])
rules = self.vsm.get_edge_firewall_info(edge_id) rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_name_3)) rules, firewall_name_3))
@ -660,15 +723,15 @@ class FWaaSTestJSON(base.BaseNetworkTest):
if firewall_topo['router']['name'] in key['name']: if firewall_topo['router']['name'] in key['name']:
edge_id = key['id'] edge_id = key['id']
break break
rules = self.vsm.get_edge_firewall_info(edge_id) rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_topo['firewall_name'])) rules, firewall_topo['firewall_name']))
self.fwaasv1_client.remove_firewall_rule_from_policy( self.fwaasv1_client.remove_firewall_rule_from_policy(
firewall_topo['fw_policy_id'], firewall_topo['fw_rule_id1']) firewall_topo['fw_policy_id'], firewall_topo['fw_rule_id1'])
self.delete_firewall_and_wait(firewall_topo['firewall_id']) self.delete_firewall_and_wait(firewall_topo['firewall_id'])
time.sleep(60) time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
rules = self.vsm.get_edge_firewall_info(edge_id) rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual( self.assertEqual(
False, self._check_firewall_rule_exists_at_backend( False, self._check_firewall_rule_exists_at_backend(
rules, firewall_topo['firewall_name'])) rules, firewall_topo['firewall_name']))
@ -683,7 +746,7 @@ class FWaaSTestJSON(base.BaseNetworkTest):
if firewall_topo['router']['name'] in key['name']: if firewall_topo['router']['name'] in key['name']:
edge_id = key['id'] edge_id = key['id']
break break
rules = self.vsm.get_edge_firewall_info(edge_id) rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual( self.assertEqual(
True, self._check_firewall_rule_exists_at_backend( True, self._check_firewall_rule_exists_at_backend(
rules, firewall_topo['firewall_name'])) rules, firewall_topo['firewall_name']))
@ -771,11 +834,59 @@ class FWaaSTestJSON(base.BaseNetworkTest):
# Verify removal of rule from firewall policy # Verify removal of rule from firewall policy
self.assertNotIn(fw_rule_id1, self._get_list_fw_rule_ids(fw_policy_id)) self.assertNotIn(fw_rule_id1, self._get_list_fw_rule_ids(fw_policy_id))
def _get_list_fw_rule_ids(self, fw_policy_id): @test.attr(type='nsxv')
fw_policy = self.fwaasv1_client.show_firewall_policy( @decorators.idempotent_id('901dae30-b148-43d9-ac86-09777aeaba20')
fw_policy_id) def test_update_firewall_name_at_backend_excl_edge(self):
return [ruleid for ruleid in fw_policy['firewall_policy'] firewall_topo = self._create_firewall_basic_topo('exclusive')
['firewall_rules']] fw_rule_id = firewall_topo['fw_rule_id1']
body = self.fwaasv1_client.update_firewall_rule(fw_rule_id,
name="updated_rule")
updated_fw_rule = body["firewall_rule"]
self.assertEqual("updated_rule", updated_fw_rule['name'])
edges = self.vsm.get_all_edges()
for key in edges:
if firewall_topo['router']['name'] in key['name']:
edge_id = key['id']
break
rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual(
True, self._check_firewall_rule_exists_at_backend(
rules, "Fwaas-updated_rule"))
@test.attr(type='nsxv')
@decorators.idempotent_id('471ebc13-8e3b-4aca-85b8-747935bf0559')
def test_update_firewall_name_at_backend_dist_edge(self):
firewall_topo = self._create_firewall_basic_topo('distributed')
fw_rule_id = firewall_topo['fw_rule_id1']
body = self.fwaasv1_client.update_firewall_rule(fw_rule_id,
name="updated_rule")
updated_fw_rule = body["firewall_rule"]
self.assertEqual("updated_rule", updated_fw_rule['name'])
edges = self.vsm.get_all_edges()
firewall_topo['router']['name'] += '-plr'
for key in edges:
if firewall_topo['router']['name'] in key['name']:
edge_id = key['id']
break
rules = self.vsm.get_edge_firewall_rules(edge_id)
self.assertEqual(
True, self._check_firewall_rule_exists_at_backend(
rules, "Fwaas-updated_rule"))
@test.attr(type='nsxv')
@decorators.idempotent_id('0bdc9670-17b8-4dd5-80c8-dc6e956fc6ef')
def test_create_multiple_firewall_rules_check_at_backend(self):
firewall_topo = self._create_firewall_advanced_topo('exclusive')
edges = self.vsm.get_all_edges()
for key in edges:
if firewall_topo['router']['name'] in key['name']:
edge_id = key['id']
break
firewall_rules = self.vsm.get_edge_firewall_rules(edge_id)
total_rules = firewall_topo['rules_before'] + len(firewall_rules)
self.assertGreaterEqual(total_rules, constants.NO_OF_ENTRIES,
"Firewall Rules are greater than %s" %
constants.NO_OF_ENTRIES)
@test.attr(type='nsxv') @test.attr(type='nsxv')
@decorators.idempotent_id('0249db39-6284-456a-9449-2adacdca4d3b') @decorators.idempotent_id('0249db39-6284-456a-9449-2adacdca4d3b')