From 33c2f4b645f87dc862a39d558a33b9d738b357ec Mon Sep 17 00:00:00 2001 From: shubhamk Date: Thu, 8 Aug 2019 12:42:14 +0000 Subject: [PATCH] p0 automation for below epics - Allow change of transit subnet CIDR - Advertise all static route on OpenStack Neutron Router (no NAT) - Rule tag editing Change-Id: If038941ef7f240aeeaa9692eb65cb754741a4a62 --- .../nsxv3/api/test_nsx_security_groups.py | 43 ++++ .../tests/scenario/test_new_case_coverage.py | 223 +++++++++++++++++- 2 files changed, 264 insertions(+), 2 deletions(-) diff --git a/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_security_groups.py b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_security_groups.py index 60f6e1c..375d6a2 100644 --- a/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_security_groups.py +++ b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_security_groups.py @@ -380,3 +380,46 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest): nsx_dfw_section, secgroup_rule['id']) self.assertIsNone(nsx_dfw_rule) + + @decorators.idempotent_id('c7d434f6-3553-5c8e-bd95-8b1f0a860fb4') + def test_check_nsx_security_group_rule_tag_at_backend(self): + """ + Create security group and secy=urity group rule + check security group rule tag at the backend + """ + # Create a security group + create_body, _ = self._create_security_group() + secgroup = create_body['security_group'] + dfw_error_msg = "Firewall section not found for %s!" % secgroup['name'] + if CONF.network.backend == 'nsxp': + nsx_nsgroup_policy, nsx_dfw_section_policy,\ + nsx_nsgroup, nsx_dfw_section = \ + self._wait_till_firewall_gets_realize(secgroup, + dfw_error_msg, + 'default') + else: + nsx_nsgroup, nsx_dfw_section = \ + self._wait_till_firewall_gets_realize(secgroup, + dfw_error_msg) + # Create a security group rule + client = self.security_group_rules_client + rule_create_body = client.create_security_group_rule( + security_group_id=secgroup['id'], + protocol='tcp', + direction='ingress', + port_range_min=22, + port_range_max=22, + ethertype=self.ethertype + ) + secgroup_rule = rule_create_body['security_group_rule'] + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_dfw_rule = self.nsxp.get_firewall_section_rule( + nsx_dfw_section_policy, + secgroup_rule['id'], 'default') + self.assertIsNotNone(nsx_dfw_rule) + nsx_dfw_rule = self.nsx.get_firewall_section_rule( + nsx_dfw_section, + secgroup_rule['id']) + self.assertIsNotNone(nsx_dfw_rule) + self.assertEqual(nsx_dfw_rule['rule_tag'], secgroup_rule['project_id']) diff --git a/vmware_nsx_tempest_plugin/tests/scenario/test_new_case_coverage.py b/vmware_nsx_tempest_plugin/tests/scenario/test_new_case_coverage.py index a3b413f..e082cb4 100644 --- a/vmware_nsx_tempest_plugin/tests/scenario/test_new_case_coverage.py +++ b/vmware_nsx_tempest_plugin/tests/scenario/test_new_case_coverage.py @@ -542,9 +542,9 @@ class TestNewCase(feature_manager.FeatureManager): time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) nsxport_mac_learning = self._get_nsx_mac_learning_enabled(port['port']) self.assertFalse(port['port']['port_security_enabled'], - "Port security is enabled") + "Port security is enabled") self.assertTrue(port['port']['mac_learning_enabled'], - "Mac Learning is not enabled") + "Mac Learning is not enabled") self.assertEqual(nsxport_mac_learning, port['port']['mac_learning_enabled'], "OS and NSX mac learn states don't match") @@ -1326,3 +1326,222 @@ class TestNewCase(feature_manager.FeatureManager): loadbalancer_id=lb_id, protocol="TERMINATED_HTTPS", protocol_port="443", name=listener_name, default_tls_container_ref=container_ref) + + @decorators.attr(type='nsxp') + @decorators.idempotent_id('3337127a-12ee-9014-b217-23455dbb45c3') + def test_check_nsx_security_group_rule_tag_N_S_E_W_traffic(self): + """ + Create security group and security group rule, + Check security group rule tag at backend, + Attach security group to vm port and check + traffic + """ + topology_dict = self.create_topo_single_network( + "test_secret", create_instance=False) + self.network_state = topology_dict['network_state'] + self.subnet_state = topology_dict['subnet_state'] + sec_rule_client = self.sec_rule_client + sec_client = self.sec_client + kwargs = dict(tenant_id=self.network_state['tenant_id'], + security_group_rules_client=sec_rule_client, + security_groups_client=sec_client) + self.sg = self.create_topology_security_group(**kwargs) + nsx_dfw_section = self.nsx.get_firewall_section( + self.sg['name'], self.sg['id'], nsxp=True) + for rule in self.sg['security_group_rules']: + nsx_dfw_rule = self.nsx.get_firewall_section_rule( + nsx_dfw_section, + rule['id']) + self.assertIsNotNone(nsx_dfw_rule) + self.assertEqual(nsx_dfw_rule['rule_tag'], rule['project_id']) + image_id = self.get_glance_image_id(['cirros', "esx"]) + security_groups = [{'name': self.sg['name']}] + self.create_topology_instance( + "state_vm_1", [self.network_state], + create_floating_ip=True, image_id=image_id, + security_groups=security_groups, clients=self.cmgr_adm) + self.create_topology_instance( + "state_vm_2", [self.network_state], + create_floating_ip=True, image_id=image_id, + security_groups=security_groups, clients=self.cmgr_adm) + self.check_cross_network_connectivity( + self.network_state, + self.servers_details.get("state_vm_1").floating_ips[0], + self.servers_details.get("state_vm_1").server, should_connect=True) + self.check_cross_network_connectivity( + self.network_state, + self.servers_details.get("state_vm_2").floating_ips[0], + self.servers_details.get("state_vm_2").server, should_connect=True) + # Verify fip ping N-S traffic + self.verify_ping_own_fip(self.servers_details.get("state_vm_1").server) + self.verify_ping_own_fip(self.servers_details.get("state_vm_2").server) + + @decorators.idempotent_id('3337127a-02dd-9014-b217-23455dbb45c3') + def test_create_subnet_with_default_transit_subnet_negative(self): + """ + Create subnet with default transit network + "100.64.0.0/16" it should fail + """ + network_name = data_utils.rand_name(name='tempest-net') + subnet_name = data_utils.rand_name(name='tempest-subnet') + network_state = self.create_topology_network(network_name) + self.assertRaises(exceptions.BadRequest, self.create_topology_subnet, + subnet_name, network_state, cidr='100.64.0.0/16') + + @decorators.idempotent_id('3337127a-02dd-8905-b217-23455dbb45c3') + def test_create_subnet_with_transit_subnet_check_traffic(self): + """ + Create subnet with default transit network when different + transit network is given in tier0, it should create and + check traffic + """ + rtr_name = data_utils.rand_name(name='tempest-router') + kwargs = {} + router_state = self.create_topology_router(rtr_name, + set_gateway=True, + **kwargs) + network_name = data_utils.rand_name(name='tempest-net') + subnet_name = data_utils.rand_name(name='tempest-subnet') + network_state = self.create_topology_network(network_name) + self.create_topology_subnet( + subnet_name, network_state, cidr='100.70.0.0/16', + router_id=router_state["id"]) + sec_rule_client = self.sec_rule_client + sec_client = self.sec_client + kwargs = dict(tenant_id=network_state['tenant_id'], + security_group_rules_client=sec_rule_client, + security_groups_client=sec_client) + self.sg = self.create_topology_security_group(**kwargs) + image_id = self.get_glance_image_id(['cirros', "esx"]) + security_groups = [{'name': self.sg['name']}] + self.create_topology_instance( + "state_vm_1", [network_state], + create_floating_ip=True, image_id=image_id, + security_groups=security_groups, clients=self.cmgr_adm) + self.create_topology_instance( + "state_vm_2", [network_state], + create_floating_ip=True, image_id=image_id, + security_groups=security_groups, clients=self.cmgr_adm) + self.check_cross_network_connectivity( + network_state, + self.servers_details.get("state_vm_1").floating_ips[0], + self.servers_details.get("state_vm_1").server, should_connect=True) + self.check_cross_network_connectivity( + network_state, + self.servers_details.get("state_vm_2").floating_ips[0], + self.servers_details.get("state_vm_2").server, should_connect=True) + # Verify fip ping N-S traffic + self.verify_ping_own_fip(self.servers_details.get("state_vm_1").server) + self.verify_ping_own_fip(self.servers_details.get("state_vm_2").server) + + @decorators.idempotent_id('3337127a-12ee-9016-b217-12344caa34b2') + def test_check_static_route_advertised_or_not(self): + """ + Check it should advertise static route with no-not + router + """ + kwargs = {"enable_snat": False} + router_state_1 = self.create_topology_router(set_gateway=True, + routers_client=self. + cmgr_adm.routers_client, + **kwargs) + network_lbaas_1 = self.create_topology_network( + "network_lbaas", networks_client=self.cmgr_adm.networks_client) + subnet_lbaas = self.create_topology_subnet( + "subnet_lbaas", network_lbaas_1, router_id=router_state_1["id"], + routers_client=self.cmgr_adm.routers_client, + subnets_client=self.cmgr_adm.subnets_client) + next_hop = subnet_lbaas['allocation_pools'][0]['end'] + routes = [{ + "destination": "40.0.0.0/24", + "nexthop": next_hop + }] + self.cmgr_adm.routers_client.update_router( + router_state_1['id'], routes=routes) + self.addCleanup( + self.cmgr_adm.routers_client.update_router, + router_state_1['id'], routes=[]) + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips( + router_state_1['name'], router_state_1['id']) + route_present = False + for advertised_net in nsx_router_nat_rules['advertisedNetworks']: + if len(advertised_net['networks']) > 0: + if "40.0.0.0" in advertised_net[ + 'networks'][0]['network'] and\ + advertised_net['networks'][ + 0]['advertiseRouteType'] == 'T1_STATIC' and\ + advertised_net['networks'][0]['advertiseAllow']: + route_present = True + self.assertEqual(True, route_present, 'Static Route is not advertised') + self.cmgr_adm.routers_client.update_router( + router_state_1['id'], routes=[]) + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips( + router_state_1['name'], router_state_1['id']) + route_present = False + for advertised_net in nsx_router_nat_rules['advertisedNetworks']: + if len(advertised_net['networks']) > 0: + if "40.0.0.0" in advertised_net[ + 'networks'][0]['network'] and\ + advertised_net['networks'][ + 0]['advertiseRouteType'] == 'T1_STATIC' and\ + advertised_net['networks'][0]['advertiseAllow']: + route_present = True + self.assertEqual(False, route_present, 'Static Route is advertised') + + @decorators.idempotent_id('3337127a-02dd-8905-b217-12344caa34b2') + def test_check_static_route_advertised_or_not_snat_enabled_router(self): + """ + Check it should not advertise static route with snat + enabled router + """ + kwargs = {"enable_snat": True} + router_state_1 = self.create_topology_router(set_gateway=True, + routers_client=self. + cmgr_adm.routers_client, + **kwargs) + network_lbaas_1 = self.create_topology_network( + "network_lbaas", networks_client=self.cmgr_adm.networks_client) + subnet_lbaas = self.create_topology_subnet( + "subnet_lbaas", network_lbaas_1, router_id=router_state_1["id"], + routers_client=self.cmgr_adm.routers_client, + subnets_client=self.cmgr_adm.subnets_client) + next_hop = subnet_lbaas['allocation_pools'][0]['end'] + routes = [{ + "destination": "40.0.0.0/24", + "nexthop": next_hop + }] + self.cmgr_adm.routers_client.update_router( + router_state_1['id'], routes=routes) + self.addCleanup(self.cmgr_adm.routers_client.update_router, + router_state_1['id'], routes=[]) + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips( + router_state_1['name'], router_state_1['id']) + route_not_advertise = False + for advertised_net in nsx_router_nat_rules['advertisedNetworks']: + if len(advertised_net['networks']) > 0: + if "40.0.0.0" in advertised_net[ + 'networks'][0]['network'] and\ + advertised_net['networks'][ + 0]['advertiseRouteType'] == 'T1_STATIC' and\ + advertised_net['networks'][0]['advertiseAllow'] is\ + False: + route_not_advertise = True + self.assertEqual(True, route_not_advertise, + 'Static Route is advertised') + self.cmgr_adm.routers_client.update_router( + router_state_1['id'], routes=[]) + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips( + router_state_1['name'], router_state_1['id']) + route_present = False + for advertised_net in nsx_router_nat_rules['advertisedNetworks']: + if len(advertised_net['networks']) > 0: + if "40.0.0.0" in advertised_net[ + 'networks'][0]['network'] and\ + advertised_net['networks'][ + 0]['advertiseRouteType'] == 'T1_STATIC': + route_present = True + self.assertEqual(False, route_present, 'Static Route is Present')