diff --git a/vmware_nsx_tempest/common/constants.py b/vmware_nsx_tempest/common/constants.py index 1dff7aec13..d4089f2ad4 100644 --- a/vmware_nsx_tempest/common/constants.py +++ b/vmware_nsx_tempest/common/constants.py @@ -41,3 +41,6 @@ MD_BASE_URL = "http://169.254.169.254/" NSX_BACKEND_TIME_INTERVAL = 30 NSX_BACKEND_SMALL_TIME_INTERVAL = 10 NSX_BACKEND_VERY_SMALL_TIME_INTERVAL = 5 + +# FWaaS +NO_OF_ENTRIES = 20 diff --git a/vmware_nsx_tempest/tests/nsxv/api/test_v1_fwaas.py b/vmware_nsx_tempest/tests/nsxv/api/test_v1_fwaas.py index eb7a7ceebd..f6ef7ec493 100644 --- a/vmware_nsx_tempest/tests/nsxv/api/test_v1_fwaas.py +++ b/vmware_nsx_tempest/tests/nsxv/api/test_v1_fwaas.py @@ -25,6 +25,7 @@ from tempest.lib import decorators from tempest.lib import exceptions as lib_exc from tempest import test +from vmware_nsx_tempest.common import constants from vmware_nsx_tempest.services import fwaas_client as FWAASC from vmware_nsx_tempest.services import nsxv_client @@ -121,7 +122,7 @@ class FWaaSTestJSON(base.BaseNetworkTest): "status": status, } raise lib_exc.TimeoutException(msg) - time.sleep(1) + time.sleep(constants.NSX_BACKEND_VERY_SMALL_TIME_INTERVAL) def _wait_firewall_ready(self, firewall_id): self._wait_firewall_while(firewall_id, @@ -189,6 +190,110 @@ class FWaaSTestJSON(base.BaseNetworkTest): m = ("Timed out waiting for firewall %s deleted" % fw_id) raise lib_exc.TimeoutException(m) + def _check_firewall_rule_exists_at_backend(self, rules, + firewall_rule_name): + for rule in rules: + if rule['name'] in firewall_rule_name: + self.assertIn(rule['name'], firewall_rule_name) + return True + return False + + def _create_firewall_rule_name(self, body): + firewall_rule_name = body['firewall_rule']['name'] + firewall_rule_name = "Fwaas-" + firewall_rule_name + return firewall_rule_name + + def _create_firewall_advanced_topo(self, router_type): + fw_rule_id_list = [] + router = self.create_router_by_type(router_type) + self.addCleanup(self._try_delete_router, router) + edges = self.vsm.get_all_edges() + for key in edges: + if router['name'] in key['name']: + edge_id = key['id'] + break + rules = self.vsm.get_edge_firewall_rules(edge_id) + rules_before = len(rules) + for rule_id in range(0, constants.NO_OF_ENTRIES): + if rule_id % 2 == 0: + action = "allow" + protocol = "tcp" + else: + action = "allow" + protocol = "udp" + firewall_rule = self.fwaasv1_client.create_firewall_rule( + name=data_utils.rand_name("fw-rule"), + action=action, + protocol=protocol) + fw_rule_id = firewall_rule['firewall_rule']['id'] + firewall_name = self._create_firewall_rule_name(firewall_rule) + self.addCleanup(self._try_delete_rule, fw_rule_id) + fw_rule_id_list.append(fw_rule_id) + # Update firewall policy + body = self.fwaasv1_client.create_firewall_policy( + name=data_utils.rand_name("fw-policy")) + fw_policy_id = body['firewall_policy']['id'] + self.addCleanup(self._try_delete_policy, fw_policy_id) + # Insert rule to firewall policy + for fw_rule_id in fw_rule_id_list: + self.fwaasv1_client.insert_firewall_rule_in_policy( + fw_policy_id, fw_rule_id, '', '') + firewall_1 = self.fwaasv1_client.create_firewall( + name=data_utils.rand_name("firewall"), + firewall_policy_id=fw_policy_id, + router_ids=[router['id']]) + created_firewall = firewall_1['firewall'] + self.addCleanup(self._try_delete_firewall, created_firewall['id']) + # Wait for the firewall resource to become ready + self._wait_until_ready(created_firewall['id']) + firewall_topo = dict(router=router, firewall_name=firewall_name, + fw_policy_id=fw_policy_id, + firewall_id=created_firewall['id'], + rules_before=rules_before) + return firewall_topo + + def _create_firewall_basic_topo(self, router_type, policy=None): + router = self.create_router_by_type(router_type) + self.addCleanup(self._try_delete_router, router) + body = self.fwaasv1_client.create_firewall_rule( + name=data_utils.rand_name("fw-rule"), + action="allow", + protocol="tcp") + fw_rule_id1 = body['firewall_rule']['id'] + firewall_name = self._create_firewall_rule_name(body) + self.addCleanup(self._try_delete_rule, fw_rule_id1) + # Create firewall policy + if not policy: + body = self.fwaasv1_client.create_firewall_policy( + name=data_utils.rand_name("fw-policy")) + fw_policy_id = body['firewall_policy']['id'] + self.addCleanup(self._try_delete_policy, fw_policy_id) + # Insert rule to firewall policy + self.fwaasv1_client.insert_firewall_rule_in_policy( + fw_policy_id, fw_rule_id1, '', '') + else: + fw_policy_id = policy + # Create firewall + firewall_1 = self.fwaasv1_client.create_firewall( + name=data_utils.rand_name("firewall"), + firewall_policy_id=fw_policy_id, + router_ids=[router['id']]) + created_firewall = firewall_1['firewall'] + self.addCleanup(self._try_delete_firewall, created_firewall['id']) + # Wait for the firewall resource to become ready + self._wait_until_ready(created_firewall['id']) + firewall_topo = dict(router=router, firewall_name=firewall_name, + fw_policy_id=fw_policy_id, + fw_rule_id1=fw_rule_id1, + firewall_id=created_firewall['id']) + return firewall_topo + + def _get_list_fw_rule_ids(self, fw_policy_id): + fw_policy = self.fwaasv1_client.show_firewall_policy( + fw_policy_id) + return [ruleid for ruleid in fw_policy['firewall_policy'] + ['firewall_rules']] + def create_router_by_type(self, router_type, name=None, **kwargs): routers_client = self.manager.routers_client router_name = name or data_utils.rand_name('mtz-') @@ -435,6 +540,7 @@ class FWaaSTestJSON(base.BaseNetworkTest): self._create_firewall_basic_topo('distributed', firewall_topo1['fw_policy_id']) edges = self.vsm.get_all_edges() + firewall_topo2['router']['name'] += '-plr' for key in edges: if firewall_topo1['router']['name'] in key['name']: edge_id_excl = key['id'] @@ -442,11 +548,11 @@ class FWaaSTestJSON(base.BaseNetworkTest): edge_id_dist = key['id'] if edge_id_excl and edge_id_dist: break - rules = self.vsm.get_edge_firewall_info(edge_id_excl) + rules = self.vsm.get_edge_firewall_rules(edge_id_excl) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_topo1['firewall_name'])) - rules = self.vsm.get_edge_firewall_info(edge_id_dist) + rules = self.vsm.get_edge_firewall_rules(edge_id_dist) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_topo1['firewall_name'])) @@ -467,11 +573,13 @@ class FWaaSTestJSON(base.BaseNetworkTest): edge_id_dist = key['id'] if edge_id_excl and edge_id_dist: break - rules = self.vsm.get_edge_firewall_info(edge_id_excl) + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + rules = self.vsm.get_edge_firewall_rules(edge_id_excl) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_topo1['firewall_name'])) - rules = self.vsm.get_edge_firewall_info(edge_id_dist) + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + rules = self.vsm.get_edge_firewall_rules(edge_id_dist) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_topo2['firewall_name'])) @@ -504,65 +612,20 @@ class FWaaSTestJSON(base.BaseNetworkTest): if firewall_topo['router']['name'] in key['name']: edge_id = key['id'] break - rules = self.vsm.get_edge_firewall_info(edge_id) + rules = self.vsm.get_edge_firewall_rules(edge_id) for rule in rules: + if rule['name'] in ('VSERule', 'MDServiceIP', 'MDInterEdgeNet'): + continue if rule_no == 1: - self.assertIn(rule['name'], firewall_rule_name_2) + self.assertIn(rule['name'], firewall_rule_name_2, + "Rule exists at position 1") rule_no += rule_no continue if rule_no == 2: - self.assertIn(rule['name'], firewall_topo['firewall_name']) + self.assertIn(rule['name'], firewall_topo['firewall_name'], + "Rule exists at position 2") break - def _check_firewall_rule_exists_at_backend(self, rules, - firewall_rule_name): - for rule in rules: - if rule['name'] in firewall_rule_name: - self.assertIn(rule['name'], firewall_rule_name) - return True - return False - - def _create_firewall_rule_name(self, body): - firewall_rule_name = body['firewall_rule']['name'] - firewall_rule_name = "Fwaas-" + firewall_rule_name - return firewall_rule_name - - def _create_firewall_basic_topo(self, router_type, policy=None): - router = self.create_router_by_type(router_type) - self.addCleanup(self._try_delete_router, router) - body = self.fwaasv1_client.create_firewall_rule( - name=data_utils.rand_name("fw-rule"), - action="allow", - protocol="tcp") - fw_rule_id1 = body['firewall_rule']['id'] - firewall_name = self._create_firewall_rule_name(body) - self.addCleanup(self._try_delete_rule, fw_rule_id1) - # Create firewall policy - if not policy: - body = self.fwaasv1_client.create_firewall_policy( - name=data_utils.rand_name("fw-policy")) - fw_policy_id = body['firewall_policy']['id'] - self.addCleanup(self._try_delete_policy, fw_policy_id) - # Insert rule to firewall policy - self.fwaasv1_client.insert_firewall_rule_in_policy( - fw_policy_id, fw_rule_id1, '', '') - else: - fw_policy_id = policy - # Create firewall - firewall_1 = self.fwaasv1_client.create_firewall( - name=data_utils.rand_name("firewall"), - firewall_policy_id=fw_policy_id, - router_ids=[router['id']]) - created_firewall = firewall_1['firewall'] - self.addCleanup(self._try_delete_firewall, created_firewall['id']) - # Wait for the firewall resource to become ready - self._wait_until_ready(created_firewall['id']) - firewall_topo = dict(router=router, firewall_name=firewall_name, - fw_policy_id=fw_policy_id, - fw_rule_id1=fw_rule_id1, - firewall_id=created_firewall['id']) - return firewall_topo - @test.attr(type='nsxv') @decorators.idempotent_id('da65de07-a60f-404d-ad1d-2d2c71a3b6a5') def test_firewall_add_delete_between_routers(self): @@ -578,7 +641,7 @@ class FWaaSTestJSON(base.BaseNetworkTest): if router['name'] in key['name']: edge_id = key['id'] break - rules = self.vsm.get_edge_firewall_info(edge_id) + rules = self.vsm.get_edge_firewall_rules(edge_id) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, @@ -587,7 +650,7 @@ class FWaaSTestJSON(base.BaseNetworkTest): firewall_topo['firewall_id'], router_ids=[router['id'], firewall_topo['router']['id']]) self._wait_firewall_ready(firewall_topo['firewall_id']) - rules = self.vsm.get_edge_firewall_info(edge_id) + rules = self.vsm.get_edge_firewall_rules(edge_id) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_topo['firewall_name'])) @@ -596,7 +659,7 @@ class FWaaSTestJSON(base.BaseNetworkTest): if firewall_topo['router']['name'] in key['name']: edge_id = key['id'] break - rules = self.vsm.get_edge_firewall_info(edge_id) + rules = self.vsm.get_edge_firewall_rules(edge_id) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_topo['firewall_name'])) @@ -611,7 +674,7 @@ class FWaaSTestJSON(base.BaseNetworkTest): if firewall_topo['router']['name'] in key['name']: edge_id = key['id'] break - rules = self.vsm.get_edge_firewall_info(edge_id) + rules = self.vsm.get_edge_firewall_rules(edge_id) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_topo['firewall_name'])) @@ -627,7 +690,7 @@ class FWaaSTestJSON(base.BaseNetworkTest): firewall_topo['fw_policy_id'], fw_rule_id2, firewall_topo['fw_rule_id1'], '') self._wait_firewall_ready(firewall_topo['firewall_id']) - rules = self.vsm.get_edge_firewall_info(edge_id) + rules = self.vsm.get_edge_firewall_rules(edge_id) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_name_2)) @@ -645,7 +708,7 @@ class FWaaSTestJSON(base.BaseNetworkTest): self.addCleanup(self._try_delete_policy, firewall_topo['fw_policy_id']) self.addCleanup(self._try_delete_firewall, firewall_topo['firewall_id']) - rules = self.vsm.get_edge_firewall_info(edge_id) + rules = self.vsm.get_edge_firewall_rules(edge_id) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_name_3)) @@ -660,15 +723,15 @@ class FWaaSTestJSON(base.BaseNetworkTest): if firewall_topo['router']['name'] in key['name']: edge_id = key['id'] break - rules = self.vsm.get_edge_firewall_info(edge_id) + rules = self.vsm.get_edge_firewall_rules(edge_id) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_topo['firewall_name'])) self.fwaasv1_client.remove_firewall_rule_from_policy( firewall_topo['fw_policy_id'], firewall_topo['fw_rule_id1']) self.delete_firewall_and_wait(firewall_topo['firewall_id']) - time.sleep(60) - rules = self.vsm.get_edge_firewall_info(edge_id) + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + rules = self.vsm.get_edge_firewall_rules(edge_id) self.assertEqual( False, self._check_firewall_rule_exists_at_backend( rules, firewall_topo['firewall_name'])) @@ -683,7 +746,7 @@ class FWaaSTestJSON(base.BaseNetworkTest): if firewall_topo['router']['name'] in key['name']: edge_id = key['id'] break - rules = self.vsm.get_edge_firewall_info(edge_id) + rules = self.vsm.get_edge_firewall_rules(edge_id) self.assertEqual( True, self._check_firewall_rule_exists_at_backend( rules, firewall_topo['firewall_name'])) @@ -771,11 +834,59 @@ class FWaaSTestJSON(base.BaseNetworkTest): # Verify removal of rule from firewall policy self.assertNotIn(fw_rule_id1, self._get_list_fw_rule_ids(fw_policy_id)) - def _get_list_fw_rule_ids(self, fw_policy_id): - fw_policy = self.fwaasv1_client.show_firewall_policy( - fw_policy_id) - return [ruleid for ruleid in fw_policy['firewall_policy'] - ['firewall_rules']] + @test.attr(type='nsxv') + @decorators.idempotent_id('901dae30-b148-43d9-ac86-09777aeaba20') + def test_update_firewall_name_at_backend_excl_edge(self): + firewall_topo = self._create_firewall_basic_topo('exclusive') + fw_rule_id = firewall_topo['fw_rule_id1'] + body = self.fwaasv1_client.update_firewall_rule(fw_rule_id, + name="updated_rule") + updated_fw_rule = body["firewall_rule"] + self.assertEqual("updated_rule", updated_fw_rule['name']) + edges = self.vsm.get_all_edges() + for key in edges: + if firewall_topo['router']['name'] in key['name']: + edge_id = key['id'] + break + rules = self.vsm.get_edge_firewall_rules(edge_id) + self.assertEqual( + True, self._check_firewall_rule_exists_at_backend( + rules, "Fwaas-updated_rule")) + + @test.attr(type='nsxv') + @decorators.idempotent_id('471ebc13-8e3b-4aca-85b8-747935bf0559') + def test_update_firewall_name_at_backend_dist_edge(self): + firewall_topo = self._create_firewall_basic_topo('distributed') + fw_rule_id = firewall_topo['fw_rule_id1'] + body = self.fwaasv1_client.update_firewall_rule(fw_rule_id, + name="updated_rule") + updated_fw_rule = body["firewall_rule"] + self.assertEqual("updated_rule", updated_fw_rule['name']) + edges = self.vsm.get_all_edges() + firewall_topo['router']['name'] += '-plr' + for key in edges: + if firewall_topo['router']['name'] in key['name']: + edge_id = key['id'] + break + rules = self.vsm.get_edge_firewall_rules(edge_id) + self.assertEqual( + True, self._check_firewall_rule_exists_at_backend( + rules, "Fwaas-updated_rule")) + + @test.attr(type='nsxv') + @decorators.idempotent_id('0bdc9670-17b8-4dd5-80c8-dc6e956fc6ef') + def test_create_multiple_firewall_rules_check_at_backend(self): + firewall_topo = self._create_firewall_advanced_topo('exclusive') + edges = self.vsm.get_all_edges() + for key in edges: + if firewall_topo['router']['name'] in key['name']: + edge_id = key['id'] + break + firewall_rules = self.vsm.get_edge_firewall_rules(edge_id) + total_rules = firewall_topo['rules_before'] + len(firewall_rules) + self.assertGreaterEqual(total_rules, constants.NO_OF_ENTRIES, + "Firewall Rules are greater than %s" % + constants.NO_OF_ENTRIES) @test.attr(type='nsxv') @decorators.idempotent_id('0249db39-6284-456a-9449-2adacdca4d3b')