a624cf560a
Change tenant_network_cidr, tenant_network_mask_bits, tenant_network_v6_cidr ,tenant_network_v6_mask_bits to project_network_cidr, project_network_mask_bits, project_network_v6_cidr, project_network_v6_mask_bits Change-Id: Ia679b9d855e55d8718a13f54c8284f46778cad67
158 lines
6.1 KiB
Python
158 lines
6.1 KiB
Python
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import netaddr
|
|
|
|
from tempest.api.network import base
|
|
from tempest import config
|
|
from tempest import exceptions
|
|
from tempest.lib.common.utils import data_utils
|
|
import tempest.test
|
|
|
|
CONF = config.CONF
|
|
|
|
|
|
class BaseDvsAdminNetworkTest(base.BaseAdminNetworkTest):
|
|
|
|
@classmethod
|
|
def resource_cleanup(cls):
|
|
for port in cls.ports:
|
|
cls.admin_ports_client.delete_port(port['id'])
|
|
for subnet in cls.subnets:
|
|
cls.admin_subnets_client.delete_subnet(subnet['id'])
|
|
for network in cls.networks:
|
|
cls.admin_networks_client.delete_network(network['id'])
|
|
# clean up ports, subnets and networks
|
|
cls.ports = []
|
|
cls.subnets = []
|
|
cls.networks = []
|
|
|
|
@classmethod
|
|
def create_network(cls, **kwargs):
|
|
"""Wrapper utility that returns a test admin provider network."""
|
|
network_name = (kwargs.get('net_name')
|
|
or data_utils.rand_name('test-adm-net-'))
|
|
net_type = kwargs.get('net_type', "flat")
|
|
if tempest.test.is_extension_enabled('provider', 'network'):
|
|
body = {'name': network_name}
|
|
body.update({'provider:network_type': net_type,
|
|
'provider:physical_network': 'dvs'})
|
|
if net_type == 'vlan':
|
|
_vlanid = kwargs.get('seg_id')
|
|
body.update({'provider:segmentation_id': _vlanid})
|
|
|
|
body = cls.admin_networks_client.create_network(**body)
|
|
network = body['network']
|
|
cls.networks.append(network)
|
|
return network
|
|
|
|
@classmethod
|
|
def create_subnet(cls, network):
|
|
"""Wrapper utility that returns a test subnet."""
|
|
# The cidr and mask_bits depend on the ip version.
|
|
if cls._ip_version == 4:
|
|
cidr = netaddr.IPNetwork(CONF.network.project_network_cidr
|
|
or "192.168.101.0/24")
|
|
mask_bits = CONF.network.project_network_mask_bits or 24
|
|
elif cls._ip_version == 6:
|
|
cidr = netaddr.IPNetwork(CONF.network.project_network_v6_cidr)
|
|
mask_bits = CONF.network.project_network_v6_mask_bits
|
|
# Find a cidr that is not in use yet and create a subnet with it
|
|
for subnet_cidr in cidr.subnet(mask_bits):
|
|
try:
|
|
body = cls.admin_subnets_client.create_subnet(
|
|
network_id=network['id'],
|
|
cidr=str(subnet_cidr),
|
|
ip_version=cls._ip_version)
|
|
break
|
|
except exceptions.BadRequest as e:
|
|
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
|
|
if not is_overlapping_cidr:
|
|
raise
|
|
else:
|
|
message = 'Available CIDR for subnet creation could not be found'
|
|
raise exceptions.BuildErrorException(message)
|
|
subnet = body['subnet']
|
|
cls.subnets.append(subnet)
|
|
return subnet
|
|
|
|
@classmethod
|
|
def create_port(cls, network_id, **kwargs):
|
|
"""Wrapper utility that returns a test port."""
|
|
body = cls.admin_ports_client.create_port(network_id=network_id,
|
|
**kwargs)
|
|
port = body['port']
|
|
cls.ports.append(port)
|
|
return port
|
|
|
|
@classmethod
|
|
def update_network(cls, network_id, client=None, **kwargs):
|
|
net_client = client if client else cls.admin_networks_client
|
|
return net_client.update_network(network_id, **kwargs)
|
|
|
|
@classmethod
|
|
def delete_network(cls, network_id, client=None):
|
|
net_client = client if client else cls.admin_networks_client
|
|
return net_client.delete_network(network_id)
|
|
|
|
@classmethod
|
|
def show_network(cls, network_id, client=None, **kwargs):
|
|
net_client = client if client else cls.admin_networks_client
|
|
return net_client.show_network(network_id, **kwargs)
|
|
|
|
@classmethod
|
|
def list_networks(cls, client=None, **kwargs):
|
|
net_client = client if client else cls.admin_networks_client
|
|
return net_client.list_networks(**kwargs)
|
|
|
|
@classmethod
|
|
def update_subnet(cls, subnet_id, client=None, **kwargs):
|
|
net_client = client if client else cls.admin_subnets_client
|
|
return net_client.update_subnet(subnet_id, **kwargs)
|
|
|
|
@classmethod
|
|
def delete_subnet(cls, subnet_id, client=None):
|
|
net_client = client if client else cls.admin_subnets_client
|
|
return net_client.delete_subnet(subnet_id)
|
|
|
|
@classmethod
|
|
def show_subnet(cls, subnet_id, client=None, **kwargs):
|
|
net_client = client if client else cls.admin_subnets_client
|
|
return net_client.show_subnet(subnet_id, **kwargs)
|
|
|
|
@classmethod
|
|
def list_subnets(cls, client=None, **kwargs):
|
|
net_client = client if client else cls.admin_subnets_client
|
|
return net_client.list_subnets(**kwargs)
|
|
|
|
@classmethod
|
|
def delete_port(cls, port_id, client=None):
|
|
net_client = client if client else cls.admin_ports_client
|
|
return net_client.delete_port(port_id)
|
|
|
|
@classmethod
|
|
def show_port(cls, port_id, client=None, **kwargs):
|
|
net_client = client if client else cls.admin_ports_client
|
|
return net_client.show_port(port_id, **kwargs)
|
|
|
|
@classmethod
|
|
def list_ports(cls, client=None, **kwargs):
|
|
net_client = client if client else cls.admin_ports_client
|
|
return net_client.list_ports(**kwargs)
|
|
|
|
@classmethod
|
|
def update_port(cls, port_id, client=None, **kwargs):
|
|
net_client = client if client else cls.admin_ports_client
|
|
return net_client.update_port(port_id, **kwargs)
|