Merge "p0 final patches for all features and tempoest fixex"

This commit is contained in:
Zuul 2019-06-03 08:08:51 +00:00 committed by Gerrit Code Review
commit ef049d872f
14 changed files with 797 additions and 75 deletions

View File

@ -223,3 +223,15 @@ BarbicanGroup = [
default='',
help="barbican user id"),
]
nsx_edge_group = cfg.OptGroup(name='nsx_edge',
title="nsx_edge Configuration Options")
NsxEdgeGroup = [
cfg.ListOpt('nsx_edge_ip',
default='',
help="nsx edge ip list"),
cfg.StrOpt('edge_cluster_id',
default='',
help="edge cluster id"),
]

View File

@ -103,15 +103,19 @@ class ApplianceManager(manager.NetworkScenarioTest):
def get_server_key(self, server):
return self.topology_keypairs[server['key_name']]['private_key']
def create_topology_router(self, router_name, routers_client=None,
def create_topology_router(self, router_name=None, routers_client=None,
tenant_id=None, set_gateway=True,
ext_netid=None, **kwargs):
if not routers_client:
routers_client = self.routers_client
if not tenant_id:
tenant_id = routers_client.tenant_id
if router_name:
router_name_ = constants.APPLIANCE_NAME_STARTS_WITH + router_name
if router_name:
name = data_utils.rand_name(router_name_)
else:
name = data_utils.rand_name()
if CONF.network.backend == "nsxv3":
router = routers_client.create_router(
name=name, admin_state_up=True, tenant_id=tenant_id)['router']

View File

@ -17,6 +17,7 @@ import time
from neutron_lib import constants as nl_constants
from tempest.common.utils.linux import remote_client
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
@ -772,7 +773,8 @@ class FeatureManager(traffic_manager.IperfManager,
lb_id=None, count=None,
clean_up=None, persistence=False,
persistence_cookie_name=None,
persistence_type=None):
persistence_type=None,
create_fip=True, external_subnet=None):
count = 0
lb_name = None
session_persistence = {}
@ -783,9 +785,11 @@ class FeatureManager(traffic_manager.IperfManager,
session_persistence["cookie_name"] = persistence_cookie_name
if vip_subnet_id is None:
vip_subnet_id = self.topology_subnets["subnet_lbaas_1"]['id']
if external_subnet is not None:
vip_subnet_id = external_subnet
if lb_id is None:
lb_name = data_utils.rand_name(self.namestart)
if barbican:
if barbican or external_subnet:
self.loadbalancer = self.\
load_balancers_admin_client.\
create_load_balancer(name=lb_name,
@ -820,6 +824,18 @@ class FeatureManager(traffic_manager.IperfManager,
self.listener['id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
elif external_subnet:
listener_name = data_utils.rand_name("tempest_lb")
self.listener = self.listeners_admin_client.\
create_listener(loadbalancer_id=lb_id, protocol=protocol_type,
protocol_port=protocol_port,
name=listener_name)['listener']
if clean_up is None:
self.addCleanup(
self.listeners_admin_client.delete_listener,
self.listener['id'])
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
else:
self.listener = self.listeners_client.create_listener(
loadbalancer_id=lb_id, protocol=protocol_type,
@ -830,14 +846,12 @@ class FeatureManager(traffic_manager.IperfManager,
self.pool = self.pools_admin_client.create_pool(
listener_id=self.listener['id'],
lb_algorithm=lb_algorithm, protocol=pool_protocol,
name=lb_name,
session_persistence=session_persistence)['pool']
name=lb_name)['pool']
else:
self.pool = self.pools_admin_client.create_pool(
listener_id=self.listener['id'],
lb_algorithm=lb_algorithm, protocol=pool_protocol,
name=lb_id,
session_persistence=session_persistence)['pool']
name=lb_id)['pool']
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
pool_id = self.pool['id']
@ -845,6 +859,17 @@ class FeatureManager(traffic_manager.IperfManager,
self.addCleanup(self.pools_admin_client.delete_pool, pool_id)
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
elif external_subnet:
self.pool = self.pools_admin_client.create_pool(
listener_id=self.listener['id'],
lb_algorithm=lb_algorithm, protocol=protocol_type,
name=lb_name,
session_persistence=session_persistence)['pool']
pool_id = self.pool['id']
if clean_up is None:
self.addCleanup(self.pools_admin_client.delete_pool, pool_id)
self.load_balancers_admin_client.wait_for_load_balancer_status(
lb_id)
else:
self.pool = self.pools_client.create_pool(
listener_id=self.listener['id'],
@ -853,8 +878,7 @@ class FeatureManager(traffic_manager.IperfManager,
session_persistence=session_persistence)['pool']
self.wait_for_load_balancer_status(lb_id)
pool_id = self.pool['id']
if barbican:
if barbican or external_subnet:
self.healthmonitor = (
self.health_monitors_admin_client.create_health_monitor(
pool_id=pool_id, type=hm_type,
@ -876,19 +900,25 @@ class FeatureManager(traffic_manager.IperfManager,
self.members = []
for server_name in self.topology_servers.keys():
if count < member_count:
fip_data = self.servers_details[server_name].floating_ips[0]
if create_fip:
fip_data = self.servers_details[server_name].\
floating_ips[0]
fixed_ip_address = fip_data['fixed_ip_address']
if fip_disassociate is None:
if barbican:
if barbican or external_subnet:
kwargs = dict(port_id=None)
self.cmgr_adm.floating_ips_client.\
update_floatingip(fip_data['id'],
**kwargs)['floatingip']
else:
self._disassociate_floating_ip(fip_data)
else:
net_name = self.servers_details[server_name][2][0]['name']
fixed_ip_address = self.servers_details[
server_name][0]['addresses'][net_name][0]['addr']
if weight:
weight += count
if barbican:
if barbican or external_subnet:
member = self.members_admin_client.create_member(
pool_id, subnet_id=vip_subnet_id,
address=fixed_ip_address,
@ -906,17 +936,22 @@ class FeatureManager(traffic_manager.IperfManager,
pool_id, subnet_id=vip_subnet_id,
address=fixed_ip_address,
protocol_port=pool_port)['member']
elif external_subnet:
member = self.members_admin_client.create_member(
pool_id, subnet_id=vip_subnet_id,
address=fixed_ip_address,
protocol_port=protocol_port)
else:
member = self.members_client.create_member(
pool_id, subnet_id=vip_subnet_id,
address=fixed_ip_address,
protocol_port=protocol_port)
if barbican:
if barbican or external_subnet:
self.load_balancers_admin_client.\
wait_for_load_balancer_status(lb_id)
else:
self.wait_for_load_balancer_status(lb_id)
if barbican:
if barbican or external_subnet:
if clean_up is None:
self.addCleanup(
self.members_admin_client.delete_member,
@ -928,7 +963,7 @@ class FeatureManager(traffic_manager.IperfManager,
else:
break
if not CONF.nsxv3.ens:
if barbican:
if barbican or external_subnet:
self.cmgr_adm.ports_client.update_port(
self.loadbalancer['vip_port_id'],
security_groups=[self.sg['id']])
@ -937,7 +972,7 @@ class FeatureManager(traffic_manager.IperfManager,
self.loadbalancer['vip_port_id'],
security_groups=[self.sg['id']])
# create lbaas public interface
if barbican:
if barbican or external_subnet:
if not hasattr(self, 'vip_ip_address'):
self.cmgr_adm.ports_client.update_port(
self.loadbalancer['vip_port_id'],
@ -955,11 +990,14 @@ class FeatureManager(traffic_manager.IperfManager,
members=self.members,
listener_id=self.listener['id'])
else:
if create_fip:
vip_fip = \
self.create_floatingip(self.loadbalancer,
port_id=self.loadbalancer['vip_port_id']
)
port_id=self.loadbalancer[
'vip_port_id'])
self.vip_ip_address = vip_fip['floating_ip_address']
else:
self.vip_ip_address = self.loadbalancer['vip_address']
pools = self.pools_client.show_pool(
self.pool['id'])
return dict(lb_id=lb_id, pool=pools,
@ -1497,3 +1535,25 @@ class FeatureManager(traffic_manager.IperfManager,
pool_id=pool_id,
members=self.members,
listener_id=self.listener['id'])
def check_router_components_on_edge(self, router):
edge_ips = CONF.nsx_edge.nsx_edge_ip
nsx_dr_rtr_name = "DR-" + router['name']
dr_present = False
nsx_sr_rtr_name = "SR-" + router['name']
sr_present = False
for nsx_edge_ip in edge_ips:
ssh_client = remote_client.RemoteClient(
nsx_edge_ip, 'root', 'Admin!23Admin')
command = "nsxcli -c get logical-router | awk {'print $4'}"
data = ssh_client.exec_command(command)
result = data.split('\n')
present = False
present = [True for el in result if nsx_sr_rtr_name in el]
if present:
sr_present = True
present = False
present = [True for el in result if nsx_dr_rtr_name in el]
if present:
dr_present = True
return[{'dr_present': dr_present}, {'sr_present': sr_present}]

View File

@ -29,7 +29,8 @@ _opts = [
(config_nsx.l2gw_group, config_nsx.L2gwGroup),
(config_nsx.nsxv3_group, config_nsx.NSXv3Group),
(config_nsx.dns_group, config_nsx.DNSGroup),
(config_nsx.barbican_group, config_nsx.BarbicanGroup)
(config_nsx.barbican_group, config_nsx.BarbicanGroup),
(config_nsx.nsx_edge_group, config_nsx.NsxEdgeGroup)
]

View File

@ -231,6 +231,16 @@ class NSXPClient(object):
endpoint = "tier-1s"
return self.get_logical_resources(endpoint)
def get_logical_router_local_services(self, os_name, os_uuid):
"""
Retrieve all the logical routers based on router type. If tier
is None, it will return all logical routers.
"""
lrouter = self.get_logical_router(os_name, os_uuid)
router_id = lrouter['id']
return self.get_logical_resources(
'tier-1s/%s/locale-services' % router_id)
def get_logical_router(self, os_name, os_uuid):
"""
Get the logical router based on the os_name and os_uuid provided.

View File

@ -178,6 +178,9 @@ class NSXV3Client(object):
cursor = res_json.get("cursor")
if res_json.get("results"):
results.extend(res_json["results"])
else:
if res_json:
return res_json
while cursor:
page = self.get(endpoint=endpoint, cursor=cursor).json()
results.extend(page.get("results", []))
@ -472,6 +475,24 @@ class NSXV3Client(object):
lrouters = self.get_logical_routers()
return self.get_nsx_resource_by_name(lrouters, nsx_name)
def get_logical_router_nat_rule_ips(self, os_name, os_uuid):
"""
Get the logical router based on the os_name and os_uuid provided.
The name of the logical router shoud follow
<os_router_name>_<starting_5_uuid>...<trailing_5_uuid>
Return the logical router if found, otherwise return None.
"""
if not os_name or not os_uuid:
LOG.error("Name and uuid of OS router should be present "
"in order to query backend logical router created")
return None
nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:]
lrouters = self.get_logical_routers()
nsx_router = self.get_nsx_resource_by_name(lrouters, nsx_name)
endpoint = "/logical-routers/%s/"\
"debug-info?format=text" % nsx_router['id']
return self.get_logical_resources(endpoint)
def get_logical_router_ports(self, lrouter):
"""
Get all logical ports attached to lrouter
@ -500,6 +521,17 @@ class NSXV3Client(object):
response = self.get(endpoint)
return response.json()
def get_logical_router_advertisement_rules(self, lrouter):
"""Get logical router advertisement"""
if not lrouter:
LOG.error("Logical router needs to be present in order "
"to get router advertisement!")
return None
endpoint = "/logical-routers/%s/"\
"routing/advertisement/rules" % lrouter['id']
response = self.get(endpoint)
return response.json()
def get_logical_dhcp_servers(self):
"""
Get all logical DHCP servers on NSX backend

View File

@ -92,6 +92,10 @@ class BaseTestCase(base.BaseNetworkTest):
router_cfg = dict(
router_name=router_name,
external_network_id=CONF.network.public_network_id)
if CONF.network.backend == "nsxp":
router_cfg = dict(
router_name=router_name,
external_network_id=CONF.network.public_network_id)
else:
router_cfg = dict(router_name=router_name, router_type='exclusive')
if NO_ROUTER_TYPE:

View File

@ -20,9 +20,10 @@ from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client
CONF = config.CONF
@ -61,6 +62,9 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.network = cls.create_network()
def delete_security_group(self, sg_client, sg_id):
@ -108,7 +112,8 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
sg = sg_client.show_security_group(security_group_id)
return sg.get('security_group', sg)
def _wait_till_firewall_gets_realize(self, secgroup, dfw_error_msg=""):
def _wait_till_firewall_gets_realize(self, secgroup, dfw_error_msg="",
tenant_id=None):
nsx_firewall_time_counter = 0
nsx_dfw_section = None
# wait till timeout or till dfw section
@ -116,10 +121,29 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
constants.NSX_FIREWALL_REALIZED_TIMEOUT and \
not nsx_dfw_section:
nsx_firewall_time_counter += 1
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_nsgroup_policy = self.nsxp.get_ns_group(
secgroup['name'], secgroup['id'],
os_tenant_id=tenant_id)
nsx_dfw_section_policy = self.nsxp.get_firewall_section(
secgroup['name'], secgroup['id'],
os_tenant_id=tenant_id)
nsx_nsgroup = self.nsx.get_ns_group(
secgroup['name'], secgroup['id'], nsxp=True,
os_tenant_id=tenant_id)
nsx_dfw_section = self.nsx.get_firewall_section(
secgroup['name'], secgroup['id'], nsxp=True)
self.assertIsNotNone(nsx_nsgroup_policy)
self.assertIsNotNone(nsx_dfw_section_policy,
dfw_error_msg)
self.assertIsNotNone(nsx_nsgroup)
self.assertIsNotNone(nsx_dfw_section, dfw_error_msg)
else:
nsx_nsgroup = self.nsx.get_ns_group(secgroup['name'],
secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(secgroup['name'],
secgroup['id'])
nsx_dfw_section = self.nsx.get_firewall_section(
secgroup['name'], secgroup['id'])
time.sleep(constants.ONE_SEC)
self.assertIsNotNone(nsx_nsgroup)
self.assertIsNotNone(nsx_dfw_section, dfw_error_msg)
@ -135,7 +159,13 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
protocol='icmp')
sg_rule.get('id')
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
self.assertNotEmpty(self.nsx.get_firewall_section(sg_name, sg_id))
if CONF.network.backend == 'nsxp':
self.assertNotEmpty(self.nsxp.get_firewall_section(
sg_name, sg_id, 'default'))
self.assertNotEmpty(self.nsx.get_firewall_section(sg_name, sg_id,
nsxp=True))
else:
self.assertNotEmpty(self.nsxp.get_firewall_section(sg_name, sg_id))
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('2c8d013d-4c0b-4d2b-b77c-779351a789ce')
@ -199,8 +229,18 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
protocol='icmp')
sg_rule.get('id')
dfw_error_msg = "Firewall section not found for %s!" % sg_name
self._wait_till_firewall_gets_realize(sg, dfw_error_msg)
self._wait_till_firewall_gets_realize(sg, dfw_error_msg,
tenant_id='default')
if CONF.network.backend == 'nsxp':
firewall_section = self.nsxp.get_firewall_section(
sg_name, sg_id, 'default')
else:
firewall_section = self.nsx.get_firewall_section(sg_name, sg_id)
if CONF.network.backend == 'nsxp':
output = self.nsxp.get_firewall_section_rules(
firewall_section,
tenant_id='default')
else:
output = self.nsx.get_firewall_section_rules(firewall_section)
self.assertEqual('DROP', output[0]['action'])
@ -519,6 +559,10 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
while nsx_firewall_time_counter < \
constants.NSX_FIREWALL_REALIZED_TIMEOUT and not provider_sec:
nsx_firewall_time_counter += 1
if CONF.network.backend == 'nsxp':
firewall_sections = self.nsxp.get_firewall_sections(
tenant_id='default')
else:
firewall_sections = self.nsx.get_firewall_sections()
for section in firewall_sections:
if provider_sg_name in section['display_name']:
@ -529,6 +573,7 @@ class ProviderSecurityGroupTest(base.BaseAdminNetworkTest):
# when execute tempest in parallel fashion,
# we create provider security group for other tests,
# NSX will return all provider security group from DFW.
if CONF.network.backend != 'nsxp':
if section['applied_tos'][0]['target_type'] == "LogicalRouter":
continue
if PROVIDER_SECURITY_GRP in section['display_name'] and \

View File

@ -186,7 +186,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
self.network = self._create_network()
self.subnet = self._create_subnet(
self.network, cidr=CONF.network.project_network_cidr)
#check backend if the network was created
# check backend if the network was created
msg = 'network %s not found' % self.network['name']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
@ -215,7 +215,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
"""
self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user')
#try to update network name as NSX admin
# try to update network name as NSX admin
data.update({"display_name": "nsx_modified_switch"})
response = self.nsx.ca_put_request(component='segments',
comp_id=data['id'],
@ -234,7 +234,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
self.network = self._create_network()
self.subnet = self._create_subnet(
self.network, cidr=CONF.network.project_network_cidr)
#create router and add an interface
# create router and add an interface
self.router = self._create_router(
router_name=data_utils.rand_name('router-cert-mgmt'),
external_network_id=CONF.network.public_network_id)
@ -242,7 +242,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
self.router['id'], subnet_id=self.subnet['id'])
self.addCleanup(self.routers_client.remove_router_interface,
self.router['id'], subnet_id=self.subnet['id'])
#check backend if the router was created
# check backend if the router was created
msg = 'router %s not found' % self.router['name']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
@ -269,14 +269,14 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
"""
self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user')
#Obtain any router port corresponding to the logical router
# Obtain any router port corresponding to the logical router
rtr_ports = self.nsx.get_logical_router_ports(data)
#try to update router name as NSX admin
# try to update router name as NSX admin
data.update({"display_name": "nsx_modified_router"})
response = self.nsx.ca_put_request(component='logical-routers',
comp_id=data['id'], body=data)
self.parse_response(response)
#try to delete logical router port as NSX admin
# try to delete logical router port as NSX admin
if len(rtr_ports) != 0:
response = self.nsx.ca_delete_request(
component='logical-router-ports',
@ -297,7 +297,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#obtain all switching profiles at the backend
# obtain all switching profiles at the backend
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_policy = self.nsxp.get_qos_profile(policy['name'],
@ -311,7 +311,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
+ "..." + policy['id'][-5:]
nsx_policy_v3 = self.nsx.get_nsx_resource_by_name(qos_policies,
nsx_name)
#check backend if the qos policy was created
# check backend if the qos policy was created
msg = 'Qos policy %s not found' % policy['name']
self.assertIsNotNone(self.nsx.get_switching_profile(
nsx_policy_v3['id']), msg)
@ -335,7 +335,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
qos_policies = self.nsx.get_switching_profiles()
nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies,
policy['name'])
#check backend if the qos policy was created
# check backend if the qos policy was created
msg = 'Qos policy %s not found' % policy['name']
self.assertIsNotNone(self.nsx.get_switching_profile(
nsx_policy['id']), msg)
@ -346,12 +346,12 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
"""
self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user')
#try to update qos policy as NSX admin
# try to update qos policy as NSX admin
data.update({"display_name": "nsx_modified_qos-policy"})
response = self.nsx.ca_put_request(component='switching-profiles',
comp_id=data['id'], body=data)
self.parse_response(response)
#try to delete qos policy as NSX admin
# try to delete qos policy as NSX admin
response = self.nsx.ca_delete_request(
component='switching-profiles', comp_id=data['id'])
self.parse_response(response)
@ -366,34 +366,34 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
Verify if NSX admin can not delete the firewall
"""
self.security_group = self._create_security_group()
#check backend if the firewall section was created
# check backend if the firewall section was created
msg = 'Security group %s not found' % self.security_group['name']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self.assertIsNotNone(self.nsxp.get_firewall_section(
self.security_group['name'], self.security_group['id'],
os_tenant_id=self.security_group['tenant_id']), msg)
os_tenant_id='default'), msg)
self.assertIsNotNone(self.nsx.get_firewall_section(
self.security_group['name'],
self.security_group['id'], nsxp=True), msg)
data = self.nsxp.get_firewall_section(
self.security_group['name'],
self.security_group['id'],
os_tenant_id=self.security_group['tenant_id'])
os_tenant_id='default')
self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user')
fw_rules = self.nsxp.get_firewall_section_rules(
data,
tenant_id=self.security_group['tenant_id'])
tenant_id='default')
data.update({"display_name": "nsx_modified_security_group"})
response = self.nsxp.ca_put_request(
component='domains/%s/security-policies' % (
self.security_group['tenant_id']),
'default'),
comp_id=data['id'], body=data)
self.parse_response(response)
if len(fw_rules) != 0:
component = 'domains/%s/security-policies/%s/rules' % (
self.security_group['tenant_id'], data['id'])
'default', data['id'])
response = self.nsxp.ca_delete_request(
component=component, comp_id=fw_rules[0]['id'])
self.parse_response(response)
@ -409,14 +409,14 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
"""
self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user')
#obtain firewall rules related to the security group
# obtain firewall rules related to the security group
fw_rules = self.nsx.get_firewall_section_rules(data)
#try to update security group as NSX admin
# try to update security group as NSX admin
data.update({"display_name": "nsx_modified_security_group"})
response = self.nsx.ca_put_request(component='firewall/sections',
comp_id=data['id'], body=data)
self.parse_response(response)
#try to delete logical firewall rule as NSX admin
# try to delete logical firewall rule as NSX admin
if len(fw_rules) != 0:
component = 'firewall/sections/' + data['id'] + '/rules'
response = self.nsx.ca_delete_request(
@ -440,14 +440,14 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
data = data['nsx_port']
self.assertEqual(data['_create_user'], self.openstack_tag,
'Incorrect tag for the create user')
#try to update logical port as NSX admin
# try to update logical port as NSX admin
data.update({"display_name": "nsx_modified_logical_port"})
if CONF.network.backend == 'nsxp':
response = self.nsxp.ca_put_request(
component='segments/%s/ports' % nsx_network['id'],
comp_id=data['id'], body=data)
self.parse_response(response)
#try to delete logical port as NSX admin
# try to delete logical port as NSX admin
response = self.nsxp.ca_delete_request(
component='segments/%s/ports' % nsx_network['id'],
comp_id=data['id'])
@ -456,7 +456,7 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
response = self.nsx.ca_put_request(component='logical-ports',
comp_id=data['id'], body=data)
self.parse_response(response)
#try to delete logical port as NSX admin
# try to delete logical port as NSX admin
response = self.nsx.ca_delete_request(component='logical-ports',
comp_id=data['id'])
self.parse_response(response)
@ -483,16 +483,16 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
endpoint = ("qos-profiles/%s" % data['id'])
response = self.nsxp.delete_super_admin(endpoint)
else:
#obtain all switching profiles at the backend
# obtain all switching profiles at the backend
qos_policies = self.nsx.get_switching_profiles()
nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies,
policy['name'])
#check backend if the qos policy was created
# check backend if the qos policy was created
msg = 'Qos policy %s not found' % policy['name']
self.assertIsNotNone(self.nsx.get_switching_profile(
nsx_policy['id']), msg)
data = self.nsx.get_switching_profile(nsx_policy['id'])
#try to delete qos policy as NSX admin
# try to delete qos policy as NSX admin
endpoint = ("/%s/%s" % ('switching-profiles',
data['id']))
response = self.nsx.delete_super_admin(endpoint)

View File

@ -129,7 +129,7 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
instance_fixed_ip = instance_addr[0]["addr"]
port_id = None
for port in self._list_ports():
if "fixed_ips" in port:
if "fixed_ips" in port and len(port["fixed_ips"]) > 0:
port_fixed_ip = port["fixed_ips"][0]["ip_address"]
if port["network_id"] == network_id and port["fixed_ips"][0][
"subnet_id"] == subnet_id and instance["id"] == port[

View File

@ -136,7 +136,7 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
instance_fixed_ip = instance_addr[0]["addr"]
port_id = None
for port in self._list_ports():
if "fixed_ips" in port:
if "fixed_ips" in port and len(port["fixed_ips"]) > 0:
port_fixed_ip = port["fixed_ips"][0]["ip_address"]
if port["network_id"] == network_id and port["fixed_ips"][0][
"subnet_id"] == subnet_id and instance["id"] == port[

View File

@ -0,0 +1,206 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import nsxv3_client
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestLBVipRoute(feature_manager.FeatureManager):
"""Test TestLBVipRoute
Adding test cases to test deploy tier1
on sepcific edge_cluster.
"""
def setUp(self):
super(TestLBVipRoute, self).setUp()
self.cmgr_adm = self.get_client_manager('admin')
self.cmgr_alt = self.get_client_manager('alt')
self.cmgr_adm = self.get_client_manager('admin')
self.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
CONF.validation.ssh_shell_prologue = ''
self.vip_ip_address = ''
self.namestart = 'lbaas-ops'
self.poke_counters = 12
self.hm_delay = 4
self.hm_max_retries = 3
self.hm_timeout = 10
self.server_names = []
self.loadbalancer = None
self.vip_fip = None
self.web_service_start_delay = 2.5
@classmethod
def skip_checks(cls):
"""Class level skip checks.
Class level check. Skip all the MDproxy tests, if native_dhcp_metadata
is not True under nsxv3 section of the config
"""
super(TestLBVipRoute, cls).skip_checks()
@decorators.idempotent_id('2317349c-02dd-0016-c228-98844caa46c3')
def test_lb_vip_route_with_tenant_net(self):
"""
Check lb vip route should not present on tier1
if floating ip is not created.
"""
kwargs = {"enable_snat": True}
router_state_1 = self.create_topology_router(set_gateway=True,
**kwargs)
network_lbaas_1 = self.create_topology_network("network_lbaas")
sec_rule_client = self.manager.security_group_rules_client
sec_client = self.manager.security_groups_client
kwargs = dict(tenant_id=network_lbaas_1['tenant_id'],
security_group_rules_client=sec_rule_client,
security_groups_client=sec_client)
self.sg = self.create_topology_security_group(**kwargs)
lbaas_rules = [dict(direction='ingress', protocol='tcp',
port_range_min=constants.HTTP_PORT,
port_range_max=constants.HTTP_PORT, ),
dict(direction='ingress', protocol='tcp',
port_range_min=443, port_range_max=443, )]
for rule in lbaas_rules:
self.add_security_group_rule(self.sg, rule)
subnet_lbaas = self.create_topology_subnet(
"subnet_lbaas", network_lbaas_1, router_id=router_state_1["id"])
no_of_servers = 2
image_id = self.get_glance_image_id(["cirros", "esx"])
for instance in range(0, no_of_servers):
self.create_topology_instance(
"server_lbaas_%s" % instance, [network_lbaas_1],
security_groups=[{'name': self.sg['name']}],
image_id=image_id, create_floating_ip=False)
lb_resource = self.create_project_lbaas(protocol_type="HTTP",
protocol_port="80",
vip_subnet_id=subnet_lbaas[
'id'],
lb_algorithm="ROUND_ROBIN",
hm_type='PING',
create_fip=False,
clean_up=True)
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips(
router_state_1['name'], router_state_1['id'])
route_present = False
for advertised_net in nsx_router_nat_rules['advertisedNetworks']:
if len(advertised_net['networks']) > 0:
if lb_resource['vip_ip'] in \
advertised_net['networks'][0]['network']:
route_present = True
self.assertEqual(False, route_present, 'Lb vip route is advertised')
vip_fip = self.create_floatingip(
self.loadbalancer,
client=self.cmgr_adm.floating_ips_client,
port_id=self.loadbalancer['vip_port_id'])
self.vip_ip_address = vip_fip['floating_ip_address']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips(
router_state_1['name'], router_state_1['id'])
route_present = False
for advertised_net in nsx_router_nat_rules['advertisedNetworks']:
if len(advertised_net['networks']) > 0:
if self.vip_ip_address in\
advertised_net['networks'][0]['network']:
route_present = True
self.assertEqual(True, route_present, 'Lb vip route is not advertised')
kwargs = dict(port_id=None)
self.cmgr_adm.floating_ips_client.\
update_floatingip(vip_fip['id'],
**kwargs)['floatingip']
self.delete_loadbalancer_resources(lb_resource['lb_id'])
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips(
router_state_1['name'], router_state_1['id'])
route_present = False
for advertised_net in nsx_router_nat_rules['advertisedNetworks']:
if len(advertised_net['networks']) > 0:
if lb_resource['vip_ip'] in\
advertised_net['networks'][0]['network']:
route_present = True
self.assertEqual(False, route_present, 'Lb vip route is advertised')
@decorators.idempotent_id('2317349c-02cc-1127-d339-09955dbb47d4')
def test_lb_vip_route_with_external_net(self):
"""
Check lb vip vip route should be present with external net
"""
kwargs = {"enable_snat": True}
router_state_1 = self.create_topology_router(set_gateway=True,
routers_client=self.
cmgr_adm.routers_client,
**kwargs)
network_lbaas_1 = self.create_topology_network(
"network_lbaas", networks_client=self.cmgr_adm.networks_client)
sec_rule_client = self.cmgr_adm.security_group_rules_client
sec_client = self.cmgr_adm.security_groups_client
kwargs = dict(tenant_id=network_lbaas_1['tenant_id'],
security_group_rules_client=sec_rule_client,
security_groups_client=sec_client)
self.sg = self.create_topology_security_group(**kwargs)
lbaas_rules = [dict(direction='ingress', protocol='tcp',
port_range_min=constants.HTTP_PORT,
port_range_max=constants.HTTP_PORT, ),
dict(direction='ingress', protocol='tcp',
port_range_min=443, port_range_max=443, )]
for rule in lbaas_rules:
self.add_security_group_rule(
self.sg,
rule,
ruleclient=sec_rule_client,
secclient=sec_client,
tenant_id=network_lbaas_1['tenant_id'])
subnet_lbaas = self.create_topology_subnet(
"subnet_lbaas", network_lbaas_1,
subnets_client=self.cmgr_adm.subnets_client,
routers_client=self.cmgr_adm.routers_client,
router_id=router_state_1["id"])
no_of_servers = 2
image_id = self.get_glance_image_id(["cirros", "esx"])
for instance in range(0, no_of_servers):
self.create_topology_instance(
"server_lbaas_%s" % instance, [network_lbaas_1],
security_groups=[{'name': self.sg['name']}],
image_id=image_id, create_floating_ip=False,
clients=self.cmgr_adm)
network = self.cmgr_adm.networks_client.show_network(
CONF.network.public_network_id)['network']
lb_resource = self.create_project_lbaas(
protocol_type="HTTP", protocol_port="80",
vip_subnet_id=subnet_lbaas['id'],
lb_algorithm="ROUND_ROBIN", hm_type='PING',
create_fip=False, clean_up=True,
external_subnet=network['subnets'][0])
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_router_nat_rules = self.nsx.get_logical_router_nat_rule_ips(
router_state_1['name'], router_state_1['id'])
route_present = False
for advertised_net in nsx_router_nat_rules['advertisedNetworks']:
if len(advertised_net['networks']) > 0:
if lb_resource['vip_ip'] in\
advertised_net['networks'][0]['network']:
route_present = True
self.assertEqual(True, route_present, 'Lb vip route is not advertised')

View File

@ -0,0 +1,178 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import nsxv3_client
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestTier1DRComponentDeployment(feature_manager.FeatureManager):
"""Test TestTier1DRComponentDeployment
Adding test cases to test deploy tier1
on sepcific edge_cluster.
"""
def setUp(self):
super(TestTier1DRComponentDeployment, self).setUp()
self.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
CONF.validation.ssh_shell_prologue = ''
self.vip_ip_address = ''
self.namestart = 'lbaas-ops'
self.poke_counters = 12
self.hm_delay = 4
self.hm_max_retries = 3
self.hm_timeout = 10
self.server_names = []
self.loadbalancer = None
self.vip_fip = None
self.web_service_start_delay = 2.5
@classmethod
def skip_checks(cls):
"""Class level skip checks.
Class level check. Skip all the MDproxy tests, if native_dhcp_metadata
is not True under nsxv3 section of the config
"""
super(TestTier1DRComponentDeployment, cls).skip_checks()
def create_topo_single_network(self, namestart, create_instance=True,
set_gateway=True, instance_count=None,
**kwargs):
"""
Create Topo where 1 logical switches which is
connected via tier-1 router.
"""
rtr_name = data_utils.rand_name(name='tempest-router')
network_name = data_utils.rand_name(name='tempest-net')
subnet_name = data_utils.rand_name(name='tempest-subnet')
router_state = self.create_topology_router(rtr_name,
set_gateway=set_gateway,
**kwargs)
network_state = self.create_topology_network(network_name)
subnet_state = self.create_topology_subnet(subnet_name, network_state,
router_id=router_state["id"]
)
if create_instance:
image_id = self.get_glance_image_id(["cirros", "esx"])
self.create_topology_instance(
"state_vm_1", [network_state],
create_floating_ip=True, image_id=image_id)
self.create_topology_instance(
"state_vm_2", [network_state],
create_floating_ip=True, image_id=image_id)
topology_dict = dict(router_state=router_state,
network_state=network_state,
subnet_state=subnet_state)
return topology_dict
@decorators.idempotent_id('1206238b-91cc-0987-b217-09955dbb58d4')
def test_only_dr_componet_of_router_should_present(self):
"""
Check only dr component of router should be present
on edge
"""
kwargs = {"enable_snat": False}
router_state = self.create_topology_router(set_gateway=True,
**kwargs)
result = self.check_router_components_on_edge(router_state)
self.assertEqual(True, result[0]['dr_present'])
self.assertEqual(False, result[1]['sr_present'])
@decorators.idempotent_id('1206238b-02dd-1098-c228-09955dbb58d4')
def test_tier1_sr_component_should_present(self):
"""
Check sr and dr component of router should be present
on edge
"""
kwargs = {"enable_snat": False}
router_state = self.create_topology_router(set_gateway=True,
**kwargs)
result = self.check_router_components_on_edge(router_state)
self.assertEqual(True, result[0]['dr_present'])
self.assertEqual(False, result[1]['sr_present'])
public_network_info = {"external_gateway_info": dict(
network_id=CONF.network.public_network_id)}
self.routers_client.update_router(router_state['id'],
**public_network_info)
network_name = data_utils.rand_name(name='tempest-net')
subnet_name = data_utils.rand_name(name='tempest-subnet')
network_state = self.create_topology_network(network_name)
self.create_topology_subnet(subnet_name, network_state,
router_id=router_state["id"])
result = self.check_router_components_on_edge(router_state)
self.assertEqual(True, result[0]['dr_present'])
self.assertEqual(True, result[1]['sr_present'])
@decorators.idempotent_id('1206238b-02dd-1098-c228-10066ecc69e5')
def test_tier1_sr_should_create_when_service_is_enabled(self):
"""
Check sr and dr component of router should be present
on edge when any service is enable
"""
kwargs = {"enable_snat": False}
router_state_1 = self.create_topology_router(set_gateway=True,
**kwargs)
network_lbaas_1 = self.create_topology_network("network_lbaas")
sec_rule_client = self.manager.security_group_rules_client
sec_client = self.manager.security_groups_client
kwargs = dict(tenant_id=network_lbaas_1['tenant_id'],
security_group_rules_client=sec_rule_client,
security_groups_client=sec_client)
self.sg = self.create_topology_security_group(**kwargs)
lbaas_rules = [dict(direction='ingress', protocol='tcp',
port_range_min=constants.HTTP_PORT,
port_range_max=constants.HTTP_PORT, ),
dict(direction='ingress', protocol='tcp',
port_range_min=443, port_range_max=443, )]
for rule in lbaas_rules:
self.add_security_group_rule(self.sg, rule)
subnet_lbaas = self.create_topology_subnet(
"subnet_lbaas", network_lbaas_1, router_id=router_state_1["id"])
no_of_servers = 2
image_id = self.get_glance_image_id(["cirros", "esx"])
for instance in range(0, no_of_servers):
self.create_topology_instance(
"server_lbaas_%s" % instance, [network_lbaas_1],
security_groups=[{'name': self.sg['name']}],
image_id=image_id, create_floating_ip=False)
result = self.check_router_components_on_edge(router_state_1)
self.assertEqual(True, result[0]['dr_present'])
self.assertEqual(False, result[1]['sr_present'])
lb_resource = self.create_project_lbaas(protocol_type="HTTP",
protocol_port="80",
vip_subnet_id=subnet_lbaas[
'id'],
lb_algorithm="ROUND_ROBIN",
hm_type='PING',
create_fip=False)
result = self.check_router_components_on_edge(router_state_1)
self.assertEqual(True, result[0]['dr_present'])
self.assertEqual(True, result[1]['sr_present'])
self.delete_loadbalancer_resources(lb_resource['lb_id'])
result = self.check_router_components_on_edge(router_state_1)
self.assertEqual(True, result[0]['dr_present'])
self.assertEqual(False, result[1]['sr_present'])

View File

@ -0,0 +1,170 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestTier1DeploymentEdgeCluster(feature_manager.FeatureManager):
"""Test TestTier1DeploymentEdgeCluster
Adding test cases to test deploy tier1
on sepcific edge_cluster.
"""
def setUp(self):
super(TestTier1DeploymentEdgeCluster, self).setUp()
self.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
self.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
@classmethod
def skip_checks(cls):
"""Class level skip checks.
Class level check. Skip all the MDproxy tests, if native_dhcp_metadata
is not True under nsxv3 section of the config
"""
super(TestTier1DeploymentEdgeCluster, cls).skip_checks()
def create_topo_single_network(self, namestart, create_instance=True,
set_gateway=True, instance_count=None,
**kwargs):
"""
Create Topo where 1 logical switches which is
connected via tier-1 router.
"""
rtr_name = data_utils.rand_name(name='tempest-router')
network_name = data_utils.rand_name(name='tempest-net')
subnet_name = data_utils.rand_name(name='tempest-subnet')
router_state = self.create_topology_router(rtr_name,
set_gateway=set_gateway,
**kwargs)
network_state = self.create_topology_network(network_name)
subnet_state = self.create_topology_subnet(subnet_name, network_state,
router_id=router_state["id"]
)
if create_instance:
image_id = self.get_glance_image_id(["cirros", "esx"])
self.create_topology_instance(
"state_vm_1", [network_state],
create_floating_ip=True, image_id=image_id)
self.create_topology_instance(
"state_vm_2", [network_state],
create_floating_ip=True, image_id=image_id)
topology_dict = dict(router_state=router_state,
network_state=network_state,
subnet_state=subnet_state)
return topology_dict
def verify_ping_to_fip_from_ext_vm(self, server_details):
self.test_fip_check_server_and_project_network_connectivity(
server_details)
def verify_ping_own_fip(self, server):
fip = server["floating_ips"][0]["floating_ip_address"]
client = self.verify_server_ssh(server, floating_ip=fip)
ping_cmd = "ping -c 1 %s " % fip
self.exec_cmd_on_server_using_fip(ping_cmd, ssh_client=client)
@decorators.idempotent_id('1206238b-91cc-8905-b217-87733dab35b4')
def test_tier1_router_on_edge_cluster(self):
"""
Check it should not allow to create port with two
fixed ips.
"""
rtr_name = data_utils.rand_name(name='tempest-router')
kwargs = {}
router_state = self.create_topology_router(rtr_name,
set_gateway=True,
**kwargs)
if CONF.network.backend == 'nsxp':
router_services = self.nsxp.get_logical_router_local_services(
router_state['name'], router_state['id'])
edge_cluster_id = router_services[0]['edge_cluster_path'].\
split('/')[len(router_services[0]['edge_cluster_path'].
split('/')) - 1]
else:
router_services = self.nsx.get_logical_router(
router_state['name'], router_state['id'])
edge_cluster_id = router_services['edge_cluster_id']
self.assertEqual(CONF.nsx_edge.edge_cluster_id, edge_cluster_id)
@decorators.idempotent_id('2317349d-91cc-8905-b217-98844caa46c3')
def test_east_west_traffic_with_specified_edge_cluster_for_tier(self):
"""
Check it should not allow to create port with two
fixed ips.
"""
topology_dict = self.create_topo_single_network("admin_state")
router_state = topology_dict['router_state']
network_state = topology_dict['network_state']
if CONF.network.backend == 'nsxp':
router_services = self.nsxp.get_logical_router_local_services(
router_state['name'], router_state['id'])
edge_cluster_id = router_services[0]['edge_cluster_path'].\
split('/')[len(router_services[0]['edge_cluster_path'].
split('/')) - 1]
else:
router_services = self.nsx.get_logical_router(
router_state['name'], router_state['id'])
edge_cluster_id = router_services['edge_cluster_id']
self.assertEqual(CONF.nsx_edge.edge_cluster_id, edge_cluster_id)
# Verify E-W traffic
self.check_cross_network_connectivity(
network_state,
self.servers_details.get("state_vm_1").floating_ips[0],
self.servers_details.get("state_vm_1").server, should_connect=True)
self.check_cross_network_connectivity(
network_state,
self.servers_details.get("state_vm_2").floating_ips[0],
self.servers_details.get("state_vm_2").server, should_connect=True)
@decorators.idempotent_id('2317349d-91cc-8905-b217-09955caa46c3')
def test_north_south_traffic_with_specified_edge_cluster_for_tier1(self):
"""
Check it should not allow to create port with two
fixed ips.
"""
topology_dict = self.create_topo_single_network("admin_state")
router_state = topology_dict['router_state']
if CONF.network.backend == 'nsxp':
router_services = self.nsxp.get_logical_router_local_services(
router_state['name'], router_state['id'])
edge_cluster_id = router_services[0]['edge_cluster_path'].\
split('/')[len(router_services[0]['edge_cluster_path'].
split('/')) - 1]
else:
router_services = self.nsx.get_logical_router(
router_state['name'], router_state['id'])
edge_cluster_id = router_services['edge_cluster_id']
self.assertEqual(CONF.nsx_edge.edge_cluster_id, edge_cluster_id)
# Verify fip ping N-S traffic
for server, details in self.servers_details.items():
self.verify_ping_to_fip_from_ext_vm(details)
self.verify_ping_own_fip(self.topology_servers["state_vm_1"])
self.verify_ping_own_fip(self.topology_servers["state_vm_2"])