p0 automation for below epics

- Allow change of transit subnet CIDR
- Advertise all static route on OpenStack Neutron Router (no NAT)
- Rule tag editing

Change-Id: If038941ef7f240aeeaa9692eb65cb754741a4a62
This commit is contained in:
shubhamk 2019-08-08 12:42:14 +00:00
parent b0ff86bb71
commit 33c2f4b645
2 changed files with 264 additions and 2 deletions

View File

@ -380,3 +380,46 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
nsx_dfw_section,
secgroup_rule['id'])
self.assertIsNone(nsx_dfw_rule)
@decorators.idempotent_id('c7d434f6-3553-5c8e-bd95-8b1f0a860fb4')
def test_check_nsx_security_group_rule_tag_at_backend(self):
"""
Create security group and secy=urity group rule
check security group rule tag at the backend
"""
# Create a security group
create_body, _ = self._create_security_group()
secgroup = create_body['security_group']
dfw_error_msg = "Firewall section not found for %s!" % secgroup['name']
if CONF.network.backend == 'nsxp':
nsx_nsgroup_policy, nsx_dfw_section_policy,\
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup,
dfw_error_msg,
'default')
else:
nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup,
dfw_error_msg)
# Create a security group rule
client = self.security_group_rules_client
rule_create_body = client.create_security_group_rule(
security_group_id=secgroup['id'],
protocol='tcp',
direction='ingress',
port_range_min=22,
port_range_max=22,
ethertype=self.ethertype
)
secgroup_rule = rule_create_body['security_group_rule']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy,
secgroup_rule['id'], 'default')
self.assertIsNotNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section,
secgroup_rule['id'])
self.assertIsNotNone(nsx_dfw_rule)
self.assertEqual(nsx_dfw_rule['rule_tag'], secgroup_rule['project_id'])

View File

@ -542,9 +542,9 @@ class TestNewCase(feature_manager.FeatureManager):
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsxport_mac_learning = self._get_nsx_mac_learning_enabled(port['port'])
self.assertFalse(port['port']['port_security_enabled'],
"Port security is enabled")
"Port security is enabled")
self.assertTrue(port['port']['mac_learning_enabled'],
"Mac Learning is not enabled")
"Mac Learning is not enabled")
self.assertEqual(nsxport_mac_learning,
port['port']['mac_learning_enabled'],
"OS and NSX mac learn states don't match")
@ -1326,3 +1326,222 @@ class TestNewCase(feature_manager.FeatureManager):
loadbalancer_id=lb_id, protocol="TERMINATED_HTTPS",
protocol_port="443", name=listener_name,
default_tls_container_ref=container_ref)
@decorators.attr(type='nsxp')
@decorators.idempotent_id('3337127a-12ee-9014-b217-23455dbb45c3')
def test_check_nsx_security_group_rule_tag_N_S_E_W_traffic(self):
"""
Create security group and security group rule,
Check security group rule tag at backend,
Attach security group to vm port and check
traffic
"""
topology_dict = self.create_topo_single_network(
"test_secret", create_instance=False)
self.network_state = topology_dict['network_state']
self.subnet_state = topology_dict['subnet_state']
sec_rule_client = self.sec_rule_client
sec_client = self.sec_client
kwargs = dict(tenant_id=self.network_state['tenant_id'],
security_group_rules_client=sec_rule_client,
security_groups_client=sec_client)
self.sg = self.create_topology_security_group(**kwargs)
nsx_dfw_section = self.nsx.get_firewall_section(
self.sg['name'], self.sg['id'], nsxp=True)
for rule in self.sg['security_group_rules']:
nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section,
rule['id'])
self.assertIsNotNone(nsx_dfw_rule)
self.assertEqual(nsx_dfw_rule['rule_tag'], rule['project_id'])
image_id = self.get_glance_image_id(['cirros', "esx"])
security_groups = [{'name': self.sg['name']}]
self.create_topology_instance(
"state_vm_1", [self.network_state],
create_floating_ip=True, image_id=image_id,
security_groups=security_groups, clients=self.cmgr_adm)
self.create_topology_instance(
"state_vm_2", [self.network_state],
create_floating_ip=True, image_id=image_id,
security_groups=security_groups, clients=self.cmgr_adm)
self.check_cross_network_connectivity(
self.network_state,
self.servers_details.get("state_vm_1").floating_ips[0],
self.servers_details.get("state_vm_1").server, should_connect=True)
self.check_cross_network_connectivity(
self.network_state,
self.servers_details.get("state_vm_2").floating_ips[0],
self.servers_details.get("state_vm_2").server, should_connect=True)
# Verify fip ping N-S traffic
self.verify_ping_own_fip(self.servers_details.get("state_vm_1").server)
self.verify_ping_own_fip(self.servers_details.get("state_vm_2").server)
@decorators.idempotent_id('3337127a-02dd-9014-b217-23455dbb45c3')
def test_create_subnet_with_default_transit_subnet_negative(self):
"""
Create subnet with default transit network
"100.64.0.0/16" it should fail
"""
network_name = data_utils.rand_name(name='tempest-net')
subnet_name = data_utils.rand_name(name='tempest-subnet')
network_state = self.create_topology_network(network_name)
self.assertRaises(exceptions.BadRequest, self.create_topology_subnet,
subnet_name, network_state, cidr='100.64.0.0/16')
@decorators.idempotent_id('3337127a-02dd-8905-b217-23455dbb45c3')
def test_create_subnet_with_transit_subnet_check_traffic(self):
"""
Create subnet with default transit network when different
transit network is given in tier0, it should create and
check traffic
"""
rtr_name = data_utils.rand_name(name='tempest-router')
kwargs = {}
router_state = self.create_topology_router(rtr_name,
set_gateway=True,
**kwargs)
network_name = data_utils.rand_name(name='tempest-net')
subnet_name = data_utils.rand_name(name='tempest-subnet')
network_state = self.create_topology_network(network_name)
self.create_topology_subnet(
subnet_name, network_state, cidr='100.70.0.0/16',
router_id=router_state["id"])
sec_rule_client = self.sec_rule_client
sec_client = self.sec_client
kwargs = dict(tenant_id=network_state['tenant_id'],
security_group_rules_client=sec_rule_client,
security_groups_client=sec_client)
self.sg = self.create_topology_security_group(**kwargs)
image_id = self.get_glance_image_id(['cirros', "esx"])
security_groups = [{'name': self.sg['name']}]
self.create_topology_instance(
"state_vm_1", [network_state],
create_floating_ip=True, image_id=image_id,
security_groups=security_groups, clients=self.cmgr_adm)
self.create_topology_instance(
"state_vm_2", [network_state],
create_floating_ip=True, image_id=image_id,
security_groups=security_groups, clients=self.cmgr_adm)
self.check_cross_network_connectivity(
network_state,
self.servers_details.get("state_vm_1").floating_ips[0],
self.servers_details.get("state_vm_1").server, should_connect=True)
self.check_cross_network_connectivity(
network_state,
self.servers_details.get("state_vm_2").floating_ips[0],
self.servers_details.get("state_vm_2").server, should_connect=True)
# Verify fip ping N-S traffic
self.verify_ping_own_fip(self.servers_details.get("state_vm_1").server)
self.verify_ping_own_fip(self.servers_details.get("state_vm_2").server)
@decorators.idempotent_id('3337127a-12ee-9016-b217-12344caa34b2')
def test_check_static_route_advertised_or_not(self):
"""
Check it should advertise static route with no-not
router
"""
kwargs = {"enable_snat": False}
router_state_1 = self.create_topology_router(set_gateway=True,
routers_client=self.
cmgr_adm.routers_client,
**kwargs)
network_lbaas_1 = self.create_topology_network(
"network_lbaas", networks_client=self.cmgr_adm.networks_client)
subnet_lbaas = self.create_topology_subnet(
"subnet_lbaas", network_lbaas_1, router_id=router_state_1["id"],
routers_client=self.cmgr_adm.routers_client,
subnets_client=self.cmgr_adm.subnets_client)
next_hop = subnet_lbaas['allocation_pools'][0]['end']
routes = [{
"destination": "40.0.0.0/24",
"nexthop": next_hop
}]
self.cmgr_adm.routers_client.update_router(
router_state_1['id'], routes=routes)
self.addCleanup(
self.cmgr_adm.routers_client.update_router,
router_state_1['id'], routes=[])
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips(
router_state_1['name'], router_state_1['id'])
route_present = False
for advertised_net in nsx_router_nat_rules['advertisedNetworks']:
if len(advertised_net['networks']) > 0:
if "40.0.0.0" in advertised_net[
'networks'][0]['network'] and\
advertised_net['networks'][
0]['advertiseRouteType'] == 'T1_STATIC' and\
advertised_net['networks'][0]['advertiseAllow']:
route_present = True
self.assertEqual(True, route_present, 'Static Route is not advertised')
self.cmgr_adm.routers_client.update_router(
router_state_1['id'], routes=[])
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips(
router_state_1['name'], router_state_1['id'])
route_present = False
for advertised_net in nsx_router_nat_rules['advertisedNetworks']:
if len(advertised_net['networks']) > 0:
if "40.0.0.0" in advertised_net[
'networks'][0]['network'] and\
advertised_net['networks'][
0]['advertiseRouteType'] == 'T1_STATIC' and\
advertised_net['networks'][0]['advertiseAllow']:
route_present = True
self.assertEqual(False, route_present, 'Static Route is advertised')
@decorators.idempotent_id('3337127a-02dd-8905-b217-12344caa34b2')
def test_check_static_route_advertised_or_not_snat_enabled_router(self):
"""
Check it should not advertise static route with snat
enabled router
"""
kwargs = {"enable_snat": True}
router_state_1 = self.create_topology_router(set_gateway=True,
routers_client=self.
cmgr_adm.routers_client,
**kwargs)
network_lbaas_1 = self.create_topology_network(
"network_lbaas", networks_client=self.cmgr_adm.networks_client)
subnet_lbaas = self.create_topology_subnet(
"subnet_lbaas", network_lbaas_1, router_id=router_state_1["id"],
routers_client=self.cmgr_adm.routers_client,
subnets_client=self.cmgr_adm.subnets_client)
next_hop = subnet_lbaas['allocation_pools'][0]['end']
routes = [{
"destination": "40.0.0.0/24",
"nexthop": next_hop
}]
self.cmgr_adm.routers_client.update_router(
router_state_1['id'], routes=routes)
self.addCleanup(self.cmgr_adm.routers_client.update_router,
router_state_1['id'], routes=[])
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips(
router_state_1['name'], router_state_1['id'])
route_not_advertise = False
for advertised_net in nsx_router_nat_rules['advertisedNetworks']:
if len(advertised_net['networks']) > 0:
if "40.0.0.0" in advertised_net[
'networks'][0]['network'] and\
advertised_net['networks'][
0]['advertiseRouteType'] == 'T1_STATIC' and\
advertised_net['networks'][0]['advertiseAllow'] is\
False:
route_not_advertise = True
self.assertEqual(True, route_not_advertise,
'Static Route is advertised')
self.cmgr_adm.routers_client.update_router(
router_state_1['id'], routes=[])
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips(
router_state_1['name'], router_state_1['id'])
route_present = False
for advertised_net in nsx_router_nat_rules['advertisedNetworks']:
if len(advertised_net['networks']) > 0:
if "40.0.0.0" in advertised_net[
'networks'][0]['network'] and\
advertised_net['networks'][
0]['advertiseRouteType'] == 'T1_STATIC':
route_present = True
self.assertEqual(False, route_present, 'Static Route is Present')