73e049169a
Change-Id: I7afe69cacc3e3897a296280bf74bcef41978819f
697 lines
30 KiB
Python
697 lines
30 KiB
Python
# Copyright 2017 VMware Inc
|
|
# All Rights Reserved..
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import collections
|
|
import re
|
|
|
|
import netaddr
|
|
from oslo_log import log as logging
|
|
from oslo_utils import netutils
|
|
|
|
from tempest import config
|
|
from tempest.lib.common.utils import data_utils
|
|
from tempest.lib.common.utils import test_utils
|
|
from tempest.lib import exceptions as lib_exc
|
|
|
|
from vmware_nsx_tempest_plugin.common import constants
|
|
from vmware_nsx_tempest_plugin.tests.scenario import manager
|
|
|
|
CONF = config.CONF
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class ApplianceManager(manager.NetworkScenarioTest):
|
|
server_details = collections.namedtuple('server_details',
|
|
['server', 'floating_ips',
|
|
'networks'])
|
|
|
|
def setUp(self):
|
|
super(ApplianceManager, self).setUp()
|
|
self.topology_routers = {}
|
|
self.topology_networks = {}
|
|
self.topology_subnets = {}
|
|
self.topology_servers = {}
|
|
self.topology_servers_floating_ip = []
|
|
self.topology_public_network_id = CONF.network.public_network_id
|
|
self.topology_keypairs = {}
|
|
self.servers_details = {}
|
|
self.topology_port_ids = {}
|
|
self.image_ref = CONF.compute.image_ref
|
|
self.flavor_ref = CONF.compute.flavor_ref
|
|
self.run_ssh = CONF.validation.run_validation
|
|
self.ssh_user = CONF.validation.image_ssh_user
|
|
|
|
def get_internal_ips(self, server, network, device="network"):
|
|
internal_ips = [p['fixed_ips'][0]['ip_address'] for p in
|
|
self.os_admin.ports_client.list_ports(
|
|
tenant_id=server['tenant_id'],
|
|
network_id=network['id'])['ports'] if
|
|
p['device_owner'].startswith(device)]
|
|
return internal_ips
|
|
|
|
def get_internal_ipv6_ips(self, server, network, device="network"):
|
|
internal_ips = [p['fixed_ips'][1]['ip_address'] for p in
|
|
self.os_admin.ports_client.list_ports(
|
|
tenant_id=server['tenant_id'],
|
|
network_id=network['id'])['ports'] if
|
|
p['device_owner'].startswith(device)]
|
|
return internal_ips
|
|
|
|
def _verify_empty_security_group_status(self, security_group):
|
|
ip_protocols = ["IPV6", "IPV4"]
|
|
if CONF.network.backend == 'nsxp':
|
|
nsx_fw_section, nsx_fw_section_rules = \
|
|
self.nsx_client.get_firewall_section_and_rules(
|
|
security_group['name'], security_group['id'],
|
|
os_tenant_id='default')
|
|
elif CONF.network.backend == 'nsxv3':
|
|
nsx_fw_section, nsx_fw_section_rules = \
|
|
self.nsx_client.get_firewall_section_and_rules(
|
|
security_group['name'], security_group['id'])
|
|
msg = "Newly created empty security group does not meet criteria !!!"
|
|
if CONF.network.backend != 'nsxp':
|
|
self.assertEqual(nsx_fw_section["rule_count"], 2, msg)
|
|
self.assertEqual(nsx_fw_section_rules[0]["action"], "ALLOW", msg)
|
|
self.assertEqual(nsx_fw_section_rules[1]["action"], "ALLOW", msg)
|
|
self.assertEqual(nsx_fw_section_rules[0]["direction"], "OUT", msg)
|
|
self.assertEqual(nsx_fw_section_rules[1]["direction"], "OUT", msg)
|
|
self.assertIn(nsx_fw_section_rules[0]["ip_protocol"], ip_protocols,
|
|
msg)
|
|
self.assertIn(nsx_fw_section_rules[1]["ip_protocol"], ip_protocols,
|
|
msg)
|
|
|
|
def create_topology_empty_security_group(self, namestart="vmw_"):
|
|
security_group = self._create_empty_security_group(namestart=namestart)
|
|
self._verify_empty_security_group_status(security_group)
|
|
return security_group
|
|
|
|
def add_security_group_rule(self, security_group, rule,
|
|
ruleclient=None,
|
|
tenant_id=None, secclient=None):
|
|
return self._create_security_group_rule(
|
|
secgroup=security_group,
|
|
tenant_id=tenant_id,
|
|
sec_group_rules_client=ruleclient,
|
|
security_groups_client=secclient,
|
|
**rule)
|
|
|
|
def get_server_key(self, server):
|
|
return self.topology_keypairs[server['key_name']]['private_key']
|
|
|
|
def create_topology_router(self, router_name=None, routers_client=None,
|
|
tenant_id=None, set_gateway=True,
|
|
ext_netid=None, **kwargs):
|
|
if not routers_client:
|
|
routers_client = self.routers_client
|
|
if not tenant_id:
|
|
tenant_id = routers_client.tenant_id
|
|
if router_name:
|
|
router_name_ = constants.APPLIANCE_NAME_STARTS_WITH + router_name
|
|
if router_name:
|
|
name = data_utils.rand_name(router_name_)
|
|
else:
|
|
name = data_utils.rand_name()
|
|
if CONF.network.backend == "nsxv3":
|
|
router = routers_client.create_router(
|
|
name=name, admin_state_up=True, tenant_id=tenant_id)['router']
|
|
elif CONF.network.backend == "nsxv":
|
|
router = routers_client.create_router(
|
|
name=name, **kwargs)['router']
|
|
elif CONF.network.backend == "nsxp":
|
|
router = routers_client.create_router(
|
|
name=name, admin_state_up=True, tenant_id=tenant_id)['router']
|
|
if set_gateway is not False:
|
|
if not ext_netid:
|
|
external_network_id = self.topology_public_network_id
|
|
else:
|
|
external_network_id = ext_netid
|
|
if kwargs.get("enable_snat") is not None:
|
|
public_network_info = {"external_gateway_info": dict(
|
|
network_id=external_network_id,
|
|
enable_snat=kwargs["enable_snat"])}
|
|
else:
|
|
public_network_info = {"external_gateway_info": dict(
|
|
network_id=external_network_id)}
|
|
routers_client.update_router(router['id'], **public_network_info)
|
|
self.topology_routers[router_name] = router
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
routers_client.delete_router, router['id'])
|
|
return router
|
|
|
|
def get_server_nics(self, ssh_client):
|
|
reg = re.compile(r'(?P<num>\d+): (?P<nic_name>\w+)[@]?.*:')
|
|
ipatxt = ssh_client.exec_command("ip address")
|
|
return reg.findall(ipatxt)
|
|
|
|
def create_instance_interface(self, server):
|
|
old_floating_ip, server = self.floating_ip_tuple
|
|
ip_address = old_floating_ip['floating_ip_address']
|
|
private_key = self._get_server_key(server)
|
|
ssh_client = self.get_remote_client(
|
|
ip_address, private_key=private_key, server=server)
|
|
self.get_server_nics(ssh_client)
|
|
# get a port from a list of one item
|
|
port_list = self.os_admin.ports_client.list_ports(
|
|
device_id=server['id'])['ports']
|
|
self.assertEqual(1, len(port_list))
|
|
interface = self.interface_client.create_interface(
|
|
server_id=server['id'],
|
|
net_id=self.new_net['id'])['interfaceAttachment']
|
|
self.addCleanup(self.ports_client.wait_for_resource_deletion,
|
|
interface['port_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.interface_client.delete_interface,
|
|
server['id'], interface['port_id'])
|
|
|
|
def update_topology_router(
|
|
self, router_id, routers_client=None, **update_kwargs):
|
|
if not routers_client:
|
|
routers_client = self.routers_client
|
|
result = routers_client.update_router(router_id,
|
|
**update_kwargs)
|
|
return result
|
|
|
|
def delete_topology_router(
|
|
self, router_id, routers_client=None):
|
|
if not routers_client:
|
|
routers_client = self.routers_client
|
|
routers_client.delete_router(router_id)
|
|
|
|
def show_topology_router(
|
|
self, router_id, routers_client=None):
|
|
if not routers_client:
|
|
routers_client = self.routers_client
|
|
result = routers_client.show_router(router_id)
|
|
return result
|
|
|
|
def create_topology_network(
|
|
self, network_name, networks_client=None,
|
|
tenant_id=None, net_name_enhance=True, **kwargs):
|
|
if not networks_client:
|
|
networks_client = self.networks_client
|
|
if not tenant_id:
|
|
tenant_id = networks_client.tenant_id
|
|
if net_name_enhance:
|
|
network_name_ = constants.APPLIANCE_NAME_STARTS_WITH + network_name
|
|
name = data_utils.rand_name(network_name_)
|
|
else:
|
|
name = network_name
|
|
# Neutron disables port security by default so we have to check the
|
|
# config before trying to create the network with port_security_enabled
|
|
if 'port_security_enabled' not in kwargs.keys():
|
|
if CONF.network_feature_enabled.port_security:
|
|
kwargs['port_security_enabled'] = True
|
|
else:
|
|
kwargs['port_security_enabled'] = False
|
|
result = networks_client.create_network(
|
|
name=name, tenant_id=tenant_id, **kwargs)
|
|
network = result['network']
|
|
self.assertEqual(network['name'], name)
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
networks_client.delete_network, network['id'])
|
|
self.topology_networks[network_name] = network
|
|
return network
|
|
|
|
def create_provider_network(self, net_type,
|
|
zone_name,
|
|
admin_state_up=True,
|
|
tz_id=None,
|
|
vlan_id_unique=None):
|
|
networks_client = self.cmgr_adm.networks_client
|
|
if net_type == constants.VXLAN_TYPE:
|
|
name = "provider_network_vxlan"
|
|
body = {"provider:physical_network": tz_id,
|
|
"provider:network_type": net_type,
|
|
"admin_state_up": admin_state_up,
|
|
"dns_domain": zone_name}
|
|
elif net_type == constants.VLAN_TYPE:
|
|
name = "provider_network_vlan"
|
|
if vlan_id_unique is not None:
|
|
vlan_id_no = vlan_id_unique
|
|
else:
|
|
vlan_id_no = constants.VLAN
|
|
if tz_id is None:
|
|
body = {"provider:segmentation_id": vlan_id_no,
|
|
"provider:network_type": net_type,
|
|
"admin_state_up": admin_state_up,
|
|
"dns_domain": zone_name}
|
|
else:
|
|
body = {"provider:segmentation_id": vlan_id_no,
|
|
"provider:network_type": net_type,
|
|
"provider:physical_network": tz_id,
|
|
"admin_state_up": admin_state_up,
|
|
"dns_domain": zone_name}
|
|
network = self.create_topology_network(name,
|
|
networks_client=networks_client,
|
|
**body)
|
|
return network
|
|
|
|
def update_topology_network(
|
|
self, network_id, networks_client=None, **update_kwargs):
|
|
if not networks_client:
|
|
networks_client = self.networks_client
|
|
result = networks_client.update_network(network_id,
|
|
**update_kwargs)
|
|
return result
|
|
|
|
def delete_topology_network(
|
|
self, network_id, networks_client=None):
|
|
if not networks_client:
|
|
networks_client = self.networks_client
|
|
networks_client.delete_network(network_id)
|
|
|
|
def create_topology_subnet(
|
|
self, subnet_name, network, routers_client=None,
|
|
subnets_client=None, router_id=None, ip_version=4, cidr=None,
|
|
mask_bits=None, **kwargs):
|
|
subnet_name_ = constants.APPLIANCE_NAME_STARTS_WITH + subnet_name
|
|
if not subnets_client:
|
|
subnets_client = self.subnets_client
|
|
if not routers_client:
|
|
routers_client = self.routers_client
|
|
|
|
def cidr_in_use(cidr, tenant_id):
|
|
"""Check cidr existence
|
|
|
|
:returns: True if subnet with cidr already exist in tenant
|
|
False else
|
|
|
|
"""
|
|
cidr_in_use = \
|
|
self.os_admin.subnets_client.\
|
|
list_subnets(tenant_id=tenant_id, cidr=cidr)['subnets']
|
|
return len(cidr_in_use) != 0
|
|
|
|
if ip_version == 6:
|
|
tenant_cidr = (cidr or netaddr.IPNetwork(
|
|
CONF.network.project_network_v6_cidr))
|
|
mask_bits = mask_bits or CONF.network.project_network_v6_mask_bits
|
|
else:
|
|
tenant_cidr = cidr or netaddr.IPNetwork(
|
|
CONF.network.project_network_cidr)
|
|
mask_bits = mask_bits or CONF.network.project_network_mask_bits
|
|
str_cidr = str(tenant_cidr)
|
|
if not cidr:
|
|
# Repeatedly attempt subnet creation with sequential cidr
|
|
# blocks until an unallocated block is found.
|
|
for subnet_cidr in tenant_cidr.subnet(mask_bits):
|
|
str_cidr = str(subnet_cidr)
|
|
if not cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
|
|
break
|
|
else:
|
|
if cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
|
|
LOG.error("Specified subnet %r is in use" % str_cidr)
|
|
raise
|
|
subnet = dict(name=data_utils.rand_name(subnet_name_),
|
|
network_id=network['id'], tenant_id=network['tenant_id'],
|
|
cidr=str_cidr, ip_version=ip_version, **kwargs)
|
|
try:
|
|
result = None
|
|
result = subnets_client.create_subnet(**subnet)
|
|
except lib_exc.Conflict as e:
|
|
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
|
|
if not is_overlapping_cidr:
|
|
raise
|
|
self.assertIsNotNone(result, 'Unable to allocate tenant network')
|
|
subnet = result['subnet']
|
|
self.assertEqual(subnet['cidr'], str_cidr)
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
subnets_client.delete_subnet, subnet['id'])
|
|
self.topology_subnets[subnet_name] = subnet
|
|
if router_id:
|
|
if not routers_client:
|
|
routers_client = self.routers_client
|
|
routers_client.add_router_interface(
|
|
router_id, subnet_id=subnet["id"])
|
|
self.addCleanup(
|
|
test_utils.call_and_ignore_notfound_exc,
|
|
routers_client.remove_router_interface, router_id,
|
|
subnet_id=subnet["id"])
|
|
return subnet
|
|
|
|
def create_topology_security_provider_group(
|
|
self, client=None, project_id=None, provider=False):
|
|
if client is None:
|
|
sg_client_admin = self.security_groups_client
|
|
else:
|
|
sg_client_admin = client.security_groups_client
|
|
sg_dict = dict(name=data_utils.rand_name('provider-sec-group'))
|
|
if project_id:
|
|
sg_dict['tenant_id'] = project_id
|
|
if provider:
|
|
sg_dict['provider'] = True
|
|
sg = sg_client_admin.create_security_group(**sg_dict)
|
|
sg = sg.get('security_group', sg)
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
sg_client_admin.delete_security_group,
|
|
sg.get('id'))
|
|
return sg
|
|
|
|
def _get_port_id(self, network_id, subnet_id, instance):
|
|
_, instance_addr = instance["addresses"].items()[0]
|
|
instance_fixed_ip = instance_addr[0]["addr"]
|
|
for port in self._list_ports():
|
|
port_fixed_ip = port["fixed_ips"][0]["ip_address"]
|
|
if port["network_id"] == network_id and port["fixed_ips"][0][
|
|
"subnet_id"] == subnet_id and instance["id"] == port[
|
|
"device_id"] and port_fixed_ip == instance_fixed_ip:
|
|
port_id = port["id"]
|
|
self.assertIsNotNone(port_id, "Failed to find Instance's port id!!!")
|
|
return port_id
|
|
|
|
def create_topology_security_group(self, **kwargs):
|
|
return self._create_security_group(**kwargs)
|
|
|
|
def update_topology_security_group(self, sg_id, client=None,
|
|
**updated_kwargs):
|
|
sg = self.security_groups_client.\
|
|
update_security_group(sg_id, **updated_kwargs)
|
|
return sg
|
|
|
|
def delete_topology_security_group(self, sg_id, client=None):
|
|
sg = self.security_groups_client.delete_security_group(sg_id)
|
|
return sg
|
|
|
|
def create_topology_security_group_rule(self, tenant_id, sg,
|
|
sec_rule_client=None,
|
|
sec_client=None,
|
|
**kwargs):
|
|
rule = self._create_security_group_rule(
|
|
tenant_id=tenant_id,
|
|
sec_group_rules_client=sec_rule_client,
|
|
security_groups_client=sec_client,
|
|
secgroup=sg,
|
|
**kwargs)
|
|
return rule
|
|
|
|
def _get_server_portid_and_ip4(self, server, ip_addr=None):
|
|
ports = self.os_admin.ports_client.list_ports(
|
|
device_id=server['id'], fixed_ip=ip_addr)['ports']
|
|
p_status = ['ACTIVE']
|
|
if getattr(CONF.service_available, 'ironic', False):
|
|
p_status.append('DOWN')
|
|
port_map = [(p["id"], fxip["ip_address"])
|
|
for p in ports
|
|
for fxip in p["fixed_ips"]
|
|
if netutils.is_valid_ipv4(fxip["ip_address"]) and
|
|
p['status'] in p_status]
|
|
inactive = [p for p in ports if p['status'] != 'ACTIVE']
|
|
if inactive:
|
|
LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)
|
|
|
|
self.assertNotEqual(0, len(port_map),
|
|
"No IPv4 addresses found in: %s" % ports)
|
|
return port_map
|
|
|
|
def add_router_interface(
|
|
self,
|
|
router_id,
|
|
subnet_id,
|
|
router_client=None):
|
|
if router_client is None:
|
|
router_client = self.routers_client
|
|
interface = router_client.add_router_interface(router_id,
|
|
subnet_id=subnet_id)
|
|
self.addCleanup(
|
|
test_utils.call_and_ignore_notfound_exc,
|
|
router_client.remove_router_interface, router_id,
|
|
subnet_id=subnet_id)
|
|
return interface
|
|
|
|
def remove_router_interface(
|
|
self,
|
|
router_id,
|
|
subnet_id,
|
|
router_client=None):
|
|
if router_client is None:
|
|
router_client = self.routers_client
|
|
router_client.remove_router_interface(router_id,
|
|
subnet_id=subnet_id)
|
|
|
|
def update_subnet(self, subnet_id, subnet_client=None, **kwargs):
|
|
if subnet_client is None:
|
|
subnet_client = self.subnets_client
|
|
result = subnet_client.update_subnet(subnet_id, **kwargs)
|
|
subnet = result['subnet']
|
|
return subnet
|
|
|
|
def create_floatingip(self, thing=None, port_id=None,
|
|
external_network_id=None,
|
|
ip4=None, client=None):
|
|
"""Create a floating IP and associates to a resource/port on Neutron"""
|
|
if not external_network_id:
|
|
external_network_id = self.topology_public_network_id
|
|
if not client:
|
|
client = self.floating_ips_client
|
|
if thing is None and port_id is None:
|
|
result = client.create_floatingip(
|
|
floating_network_id=external_network_id)
|
|
else:
|
|
result = client.create_floatingip(
|
|
floating_network_id=external_network_id,
|
|
port_id=port_id,
|
|
tenant_id=thing['tenant_id'],
|
|
fixed_ip_address=ip4
|
|
)
|
|
floating_ip = result['floatingip']
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
client.delete_floatingip,
|
|
floating_ip['id'])
|
|
return floating_ip
|
|
|
|
def create_topology_instance(
|
|
self, server_name, networks=None, security_groups=None,
|
|
config_drive=None, keypair=None, image_id=None, flavor=None,
|
|
clients=None, create_floating_ip=True, port=None, **kwargs):
|
|
# Define security group for server.
|
|
if security_groups:
|
|
if security_groups != 'undef':
|
|
kwargs["security_groups"] = security_groups
|
|
else:
|
|
_sg = self.create_topology_security_group()
|
|
_security_groups = [{'name': _sg['name']}]
|
|
kwargs["security_groups"] = _security_groups
|
|
# Define config drive for server.
|
|
if not keypair:
|
|
if clients:
|
|
client = clients.keypairs_client
|
|
keypair = self.create_keypair(client)
|
|
else:
|
|
keypair = self.create_keypair()
|
|
self.topology_keypairs[keypair['name']] = keypair
|
|
kwargs["key_name"] = keypair['name']
|
|
else:
|
|
kwargs["key_name"] = keypair['name']
|
|
# Define image id for server.
|
|
if image_id:
|
|
kwargs["image_id"] = image_id
|
|
server_name_ = constants.APPLIANCE_NAME_STARTS_WITH + server_name
|
|
# Collect all the networks for server.
|
|
networks_ = []
|
|
if networks is not None:
|
|
for net in networks:
|
|
net_ = {"uuid": net["id"]}
|
|
networks_.append(net_)
|
|
# Deploy instance with port
|
|
if port is not None:
|
|
port_ = {"port": port['id']}
|
|
networks_.append(port_)
|
|
# Deploy server with all the args.
|
|
server = self.create_server(name=server_name_, networks=networks_,
|
|
flavor=flavor, clients=clients, **kwargs)
|
|
floating_ips = []
|
|
if create_floating_ip:
|
|
ports = self._get_server_portid_and_ip4(server)
|
|
for port_id, ip4 in ports:
|
|
if clients is None:
|
|
floating_ip = self.create_floatingip(server, port_id,
|
|
ip4=ip4)
|
|
else:
|
|
floating_ip = self.\
|
|
create_floatingip(server,
|
|
port_id, ip4=ip4,
|
|
client=clients.floating_ips_client)
|
|
if server.get("floating_ips"):
|
|
server["floating_ips"].append(floating_ip)
|
|
else:
|
|
server["floating_ips"] = [floating_ip]
|
|
self.topology_servers_floating_ip.append(floating_ip)
|
|
floating_ips.append(floating_ip)
|
|
server_details = self.server_details(server=server,
|
|
floating_ips=floating_ips,
|
|
networks=networks)
|
|
self.servers_details[server_name] = server_details
|
|
self.topology_servers[server_name] = server
|
|
return server
|
|
|
|
def create_topology_port(self, network,
|
|
ports_client=None, **args):
|
|
if not ports_client:
|
|
ports_client = self.ports_client
|
|
port = ports_client.create_port(network_id=network['id'], **args)
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
ports_client.delete_port, port['port']['id'])
|
|
return port
|
|
|
|
def update_topology_port(self, port_id, ports_client=None,
|
|
**kwargs):
|
|
if not ports_client:
|
|
ports_client = self.ports_client
|
|
port = ports_client.update_port(port_id, **kwargs)
|
|
return port
|
|
|
|
def show_topology_port(self, port_id, ports_client=None,
|
|
**kwargs):
|
|
if not ports_client:
|
|
ports_client = self.ports_client
|
|
port = ports_client.show_port(port_id, **kwargs)
|
|
return port
|
|
|
|
def delete_topology_port(self, port_id, ports_client=None,
|
|
**kwargs):
|
|
if not ports_client:
|
|
ports_client = self.ports_client
|
|
port = ports_client.delete_port(port_id, **kwargs)
|
|
return port
|
|
|
|
def _list_ports(self, *args, **kwargs):
|
|
"""List ports using admin creds """
|
|
ports_list = self.os_admin.ports_client.list_ports(
|
|
*args, **kwargs)
|
|
return ports_list['ports']
|
|
|
|
def get_network_ports_id(self):
|
|
for port in self._list_ports():
|
|
for fixed_ip in port["fixed_ips"]:
|
|
ip = fixed_ip["ip_address"]
|
|
port_id = port["id"]
|
|
tenant_id = port["tenant_id"]
|
|
if tenant_id in self.topology_port_ids:
|
|
self.topology_port_ids[tenant_id][ip] = port_id
|
|
else:
|
|
self.topology_port_ids[tenant_id] = {ip: port_id}
|
|
|
|
def get_glance_image_id(self, params):
|
|
"""
|
|
Get the glance image id based on the params
|
|
|
|
:param params: List of sub-string of image name
|
|
:return:
|
|
"""
|
|
# Retrieve the list of images that meet the filter
|
|
images_list = self.os_admin.image_client_v2.list_images()['images']
|
|
# Validate that the list was fetched sorted accordingly
|
|
msg = "No images were found that met the filter criteria."
|
|
self.assertNotEmpty(images_list, msg)
|
|
image_id = None
|
|
for image in images_list:
|
|
for param in params:
|
|
if not param.lower() in image["name"].lower():
|
|
break
|
|
else:
|
|
image_id = image["id"]
|
|
break
|
|
self.assertIsNotNone(image_id, msg)
|
|
return image_id
|
|
|
|
def get_flavor_id(self, params):
|
|
"""
|
|
Get the flavor id based on the params
|
|
|
|
:param params: List of sub-string of flavor
|
|
:return:
|
|
"""
|
|
# Retrieve the list of images that meet the filter
|
|
flavors_list = self.os_admin.flavors_client.list_flavors()['flavors']
|
|
# Validate that the list was fetched sorted accordingly
|
|
msg = "No flavors were found that met the filter criteria."
|
|
self.assertNotEmpty(flavors_list, msg)
|
|
flavor_id = None
|
|
for flavor in flavors_list:
|
|
for param in params:
|
|
if not param.lower() in flavor["name"].lower():
|
|
break
|
|
else:
|
|
flavor_id = flavor["id"]
|
|
break
|
|
self.assertIsNotNone(flavor_id, msg)
|
|
return flavor_id
|
|
|
|
def get_user_id(self, client_id):
|
|
"""
|
|
Get the user id based on the openstack client
|
|
"""
|
|
if client_id == 'network':
|
|
user_id = self.networks_client.user_id
|
|
tenant_id = self.networks_client.tenant_id
|
|
elif client_id == 'router':
|
|
user_id = self.routers_client.user_id
|
|
tenant_id = self.routers_client.tenant_id
|
|
elif client_id == 'sg':
|
|
user_id = self.security_groups_client.user_id
|
|
tenant_id = self.security_groups_client.tenant_id
|
|
return user_id, tenant_id
|
|
|
|
def _create_subnet_pool(self, client, name, default_quota=None,
|
|
shared='false'):
|
|
# create subnet pool
|
|
prefix = CONF.network.default_network
|
|
subnetpool_client = client.subnetpools_client
|
|
if default_quota is None:
|
|
body = subnetpool_client.create_subnetpool(name=name,
|
|
prefixes=prefix,
|
|
shared=shared)
|
|
else:
|
|
body = subnetpool_client.create_subnetpool(
|
|
name=name, prefixes=prefix, shared=shared,
|
|
default_quota=default_quota)
|
|
subnetpool_id = body["subnetpool"]["id"]
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
subnetpool_client.delete_subnetpool,
|
|
subnetpool_id)
|
|
return body
|
|
|
|
def _create_topo_single_network(self, namestart, create_instance=True,
|
|
set_gateway=True, cidr=None, clean_up=True,
|
|
**kwargs):
|
|
"""
|
|
Create Topo where 1 logical switches which is
|
|
connected via tier-1 router.
|
|
"""
|
|
rtr_name = data_utils.rand_name(name='tempest-uni-router')
|
|
network_name = data_utils.rand_name(name='tempest-uni-net')
|
|
subnet_name = data_utils.rand_name(name='tempest-uni-subnet')
|
|
router_state = self.create_topology_router(rtr_name,
|
|
set_gateway=set_gateway,
|
|
clean_up=clean_up,
|
|
**kwargs)
|
|
network_state = self.create_topology_network(network_name,
|
|
clean_up=clean_up)
|
|
subnet_state = self.create_topology_subnet(
|
|
subnet_name, network_state, router_id=router_state["id"],
|
|
cidr=cidr, clean_up=clean_up)
|
|
if create_instance:
|
|
image_id = self.get_glance_image_id(["cirros", "esx"])
|
|
self.create_topology_instance(
|
|
"state_vm_1", [network_state],
|
|
create_floating_ip=True, image_id=image_id, clean_up=clean_up)
|
|
self.create_topology_instance(
|
|
"state_vm_2", [network_state],
|
|
create_floating_ip=True, image_id=image_id, clean_up=clean_up)
|
|
topology_dict = dict(router_state=router_state,
|
|
network_state=network_state,
|
|
subnet_state=subnet_state)
|
|
return topology_dict
|