Merge "Adding Qos and Maclearning IPv6 API tests"

This commit is contained in:
Zuul 2019-05-31 15:54:51 +00:00 committed by Gerrit Code Review
commit f0ccc47ee8
2 changed files with 480 additions and 0 deletions

View File

@ -0,0 +1,288 @@
# Copyright 2019 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import nsxv3_client
import time
CONF = config.CONF
CONF.validation.auth_method = 'None'
LOG = logging.getLogger(__name__)
class TestMacLearning(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
super(TestMacLearning, cls).skip_checks()
if not (CONF.network.project_networks_reachable or
CONF.network.public_network_id):
msg = ('Either project_networks_reachable must be "true", or '
'public_network_id must be defined.')
raise cls.skipException(msg)
if not CONF.network.public_network_cidr:
msg = "public_network_cidr must be defined in network section."
raise cls.skipException(msg)
if not (CONF.network_feature_enabled.ipv6 and
CONF.network_feature_enabled.ipv6_subnet_attributes):
raise cls.skipException('IPv6 or its attributes not supported')
@classmethod
def setup_clients(cls):
super(TestMacLearning, cls).setup_clients()
cls.cmgr_adm = cls.get_client_manager('admin')
@classmethod
def resource_setup(cls):
super(TestMacLearning, cls).resource_setup()
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def _create_ipv6_subnet(self, network, router, slaac=False):
subnet_client = self.cmgr_adm.subnets_client
subnet_name = network['name'] + 'ipv6-sub'
address_cidr = CONF.network.project_network_v6_cidr
address_prefixlen = CONF.network.project_network_v6_mask_bits
if ((address_prefixlen >= 126)):
msg = ("Subnet %s isn't large enough for the test" % address_cidr)
raise exceptions.InvalidConfiguration(msg)
allocation_pools = {'allocation_pools': [{
'start': str(address_cidr).split('/')[0] + '2',
'end':str(address_cidr).split('/')[0] + '70'}]}
if slaac:
subnet = self.create_topology_subnet(subnet_name, network,
subnets_client=subnet_client,
routers_client=self.cmgr_adm.routers_client,
router_id=router['id'],
ip_version=6, ipv6_ra_mode='slaac',
ipv6_address_mode='slaac',
**allocation_pools)
else:
subnet = self.create_topology_subnet(subnet_name, network,
subnets_client=subnet_client,
ip_version=6, enable_dhcp=False,
**allocation_pools)
return subnet
def _create_mac_learn_enabled_port(self, slaac=False):
# Create Port with required port security/sec groups config
rtr_name = data_utils.rand_name("mac-router-")
router = self.create_topology_router(
rtr_name, routers_client=self.cmgr_adm.routers_client)
name = data_utils.rand_name("qos-network")
networks_client = self.cmgr_adm.networks_client
network = self.create_topology_network(name,
networks_client=networks_client)
self._create_ipv6_subnet(network, router, slaac)
test_port_name = data_utils.rand_name('port-')
port_client = self.cmgr_adm.ports_client
body = self.create_topology_port(network=network,
ports_client=port_client, mac_learning_enabled=True,
port_security_enabled=False, security_groups=[],
name=test_port_name)
return body['port']
def _update_port_disable_mac_learning(self, port, sec_groups=None):
if sec_groups is None:
sec_groups = []
updated_sec_grp = port['security_groups'] + sec_groups
body = self.update_topology_port(port['id'],
ports_client=self.cmgr_adm.ports_client,
mac_learning_enabled=False,
port_security_enabled=True,
security_groups=updated_sec_grp)
return body['port']
def _update_port_enable_mac_learning(self, port):
body = self.update_topology_port(port['id'],
ports_client=self.cmgr_adm.ports_client,
mac_learning_enabled=True,
port_security_enabled=False,
security_groups=[])
return body['port']
def _conv_switch_prof_to_dict(self, switch_profiles):
switch_prof_dict = {}
for i in range(len(switch_profiles)):
switch_prof_dict.update(
{switch_profiles[i]['key']: switch_profiles[i]['value']})
return switch_prof_dict
def _get_nsx_mac_learning_enabled(self, port):
mac_learn_set_bool = False
# Get nsxv3 port(expects 'name' set)
nsx_port = self.nsx.get_logical_port(port['name'])
# Get list of logical port's switch profiles
port_swtch_profs = nsx_port['switching_profile_ids']
# Convert switch profiles list to dict, key:UUID
port_sw_prof_dict = self._conv_switch_prof_to_dict(port_swtch_profs)
# Get MAC learning switch profile ID
mac_sw_prof_id = port_sw_prof_dict[constants.MAC_SW_PROFILE]
# Get MAC learning switch profile json
mac_sw_profile_json = self.nsx.get_switching_profile(mac_sw_prof_id)
# Get mac-learning state for port
if ('mac_learning' in mac_sw_profile_json):
nsxport_mac_learning = mac_sw_profile_json[
'mac_learning']['enabled']
if nsxport_mac_learning:
mac_learn_set_bool = True
return mac_learn_set_bool
def _check_mac_learning(self, port, mac_learn_state=True):
# Enabling MAC Learning requires port security=False and no sec grps
nsxport_mac_learning = self._get_nsx_mac_learning_enabled(port)
if mac_learn_state:
self.assertEmpty(port['security_groups'],
"Sec grp for mac learn port is not empty")
self.assertFalse(port['port_security_enabled'],
"Port security is enabled")
self.assertTrue(port['mac_learning_enabled'],
"Mac Learning is not enabled")
self.assertEqual(nsxport_mac_learning,
port['mac_learning_enabled'],
"OS and NSX mac learn states don't match")
else:
self.assertTrue(port['port_security_enabled'],
"Port security is disabled")
if 'mac_learning_enabled' in port.keys():
self.assertFalse(port['mac_learning_enabled'],
"Mac Learning is enabled")
self.assertEqual(nsxport_mac_learning,
port['mac_learning_enabled'],
"OS and NSX mac learn states don't match")
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('811e18ab-5767-4185-8f5d-663ff19aa335')
def test_create_update_delete_mac_learning_port_static_ipv6(self):
"""
CRUD Workflow 3
Create port with MAC Learning enabled
Update port(non-MAC Learning settings)
Delete port
"""
test_port = self._create_mac_learn_enabled_port()
new_port_name = data_utils.rand_name('updated_port-')
body = self.update_topology_port(test_port['id'],
ports_client=self.cmgr_adm.ports_client,
name=new_port_name)
updated_port = body['port']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
updated_nsx_port = self.nsx.get_logical_port(updated_port['name'])
self.assertEqual(updated_nsx_port['display_name'],
updated_port['name'],
"Updated port names do not match OS and NSX")
self.delete_topology_port(test_port['id'],
ports_client=self.cmgr_adm.ports_client)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self.assertIsNone(self.nsx.get_logical_port(updated_port['name']),
"Logical port %s is not None" % updated_port['name'])
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('cf0791c9-88c7-461c-9d86-429cac2cddc8')
def test_create_update_delete_mac_learning_port_slaac_ipv6(self):
"""
CRUD Workflow 3
Create port with MAC Learning enabled
Update port(non-MAC Learning settings)
Delete port
"""
test_port = self._create_mac_learn_enabled_port(slaac=True)
new_port_name = data_utils.rand_name('updated_port-')
body = self.update_topology_port(test_port['id'],
ports_client=self.cmgr_adm.ports_client,
name=new_port_name)
updated_port = body['port']
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
updated_nsx_port = self.nsx.get_logical_port(updated_port['name'])
self.assertEqual(updated_nsx_port['display_name'],
updated_port['name'],
"Updated port names do not match OS and NSX")
self.delete_topology_port(test_port['id'],
ports_client=self.cmgr_adm.ports_client)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self.assertIsNone(self.nsx.get_logical_port(updated_port['name']),
"Logical port %s is not None" % updated_port['name'])
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('e1af7c44-f859-4d88-b0eb-7c863f149868')
def test_toggle_mac_learning_port_static_ipv6_delete(self):
"""
CRUD Workflow 2
Create port with MAC Learning enabled
Update port, disabling MAC Learning
Update port, re-enabling MAC Learning
Delete port
"""
test_port = self._create_mac_learn_enabled_port()
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self._check_mac_learning(test_port, mac_learn_state=True)
ml_off_port = self._update_port_disable_mac_learning(test_port)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self._check_mac_learning(ml_off_port, mac_learn_state=False)
ml_on_port = self._update_port_enable_mac_learning(ml_off_port)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self._check_mac_learning(ml_on_port, mac_learn_state=True)
self.delete_topology_port(ml_on_port['id'],
ports_client=self.cmgr_adm.ports_client)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self.assertIsNone(self.nsx.get_logical_port(ml_on_port['name']),
"Port %s is not None" % ml_on_port['name'])
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('8f822bc6-9801-4f6f-acda-6dec3a7bb443')
def test_toggle_mac_learning_port_slaac_ipv6_delete(self):
"""
CRUD Workflow 2
Create port with MAC Learning enabled
Update port, disabling MAC Learning
Update port, re-enabling MAC Learning
Delete port
"""
test_port = self._create_mac_learn_enabled_port(slaac=True)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self._check_mac_learning(test_port, mac_learn_state=True)
ml_off_port = self._update_port_disable_mac_learning(test_port)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self._check_mac_learning(ml_off_port, mac_learn_state=False)
ml_on_port = self._update_port_enable_mac_learning(ml_off_port)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self._check_mac_learning(ml_on_port, mac_learn_state=True)
self.delete_topology_port(ml_on_port['id'],
ports_client=self.cmgr_adm.ports_client)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self.assertIsNone(self.nsx.get_logical_port(ml_on_port['name']),
"Port %s is not None" % ml_on_port['name'])

View File

@ -0,0 +1,192 @@
# Copyright 2019 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
from vmware_nsx_tempest_plugin.lib import feature_manager
CONF = config.CONF
CONF.validation.auth_method = 'None'
LOG = logging.getLogger(__name__)
DSCP_MARK = 12
DSCP_MARK_UPDATED = 16
BW_VALUE_KBPS = 1024
BW_VALUE_MBPS = 1
UPDATED_BW_VALUE_KBPS = 2048
UPDATED_BW_VALUE_MBPS = 2
MAX_BURST_KBPS = 1024000
MAX_BURST_MBPS = 1
class TestQos(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
super(TestQos, cls).skip_checks()
if not (CONF.network.project_networks_reachable or
CONF.network.public_network_id):
msg = ('Either project_networks_reachable must be "true", or '
'public_network_id must be defined.')
raise cls.skipException(msg)
if not CONF.network.public_network_cidr:
msg = "public_network_cidr must be defined in network section."
raise cls.skipException(msg)
if not test.is_extension_enabled('qos', 'network'):
msg = "q-qos extension not enabled."
raise cls.skipException(msg)
if not (CONF.network_feature_enabled.ipv6 and
CONF.network_feature_enabled.ipv6_subnet_attributes):
raise cls.skipException('IPv6 or its attributes not supported')
@classmethod
def setup_clients(cls):
super(TestQos, cls).setup_clients()
cls.cmgr_adm = cls.get_client_manager('admin')
@classmethod
def resource_setup(cls):
super(TestQos, cls).resource_setup()
def _create_ipv6_subnet(self, network, router, slaac=False):
subnet_client = self.cmgr_adm.subnets_client
subnet_name = network['name'] + 'ipv6-sub'
address_cidr = CONF.network.project_network_v6_cidr
address_prefixlen = CONF.network.project_network_v6_mask_bits
if ((address_prefixlen >= 126)):
msg = ("Subnet %s isn't large enough for the test" % address_cidr)
raise exceptions.InvalidConfiguration(msg)
allocation_pools = {'allocation_pools': [{
'start': str(address_cidr).split('/')[0] + '2',
'end':str(address_cidr).split('/')[0] + '70'}]}
if slaac:
subnet = self.create_topology_subnet(subnet_name, network,
subnets_client=subnet_client,
routers_client=self.cmgr_adm.routers_client,
router_id=router['id'],
ip_version=6, ipv6_ra_mode='slaac',
ipv6_address_mode='slaac',
**allocation_pools)
else:
subnet = self.create_topology_subnet(subnet_name, network,
subnets_client=subnet_client,
ip_version=6, enable_dhcp=False,
**allocation_pools)
return subnet
def _create_single_ipv6_rtr_topology(self, slaac=False):
"""Create IPv6 subnet and attach them to a router
"""
rtr_name = data_utils.rand_name("qos-rtr")
router = self.create_topology_router(
rtr_name, routers_client=self.cmgr_adm.routers_client)
name = data_utils.rand_name("qos-network")
networks_client = self.cmgr_adm.networks_client
network = self.create_topology_network(name,
networks_client=networks_client)
self._create_ipv6_subnet(network, router, slaac)
return network, router
def _create_qos_policy(self):
"""Create a qos policy with bandwidth limit rule
and dscp rule
"""
name = data_utils.rand_name('test-qos-policy-')
policy = self.create_qos_policy(name,
description='bw_dscp_rule',
shared=False)
# add bw rule
self.create_bandwidth_limit_rule(
policy_id=policy['id'], max_kbps=BW_VALUE_KBPS,
max_burst_kbps=MAX_BURST_KBPS)
# add dscp rule
self.create_dscp_marking_rule(
policy_id=policy['id'], dscp_mark=DSCP_MARK)
return policy
@decorators.attr(type=['nsxv3', 'positive'])
@decorators.idempotent_id('b46d699f-9bd5-4430-a816-167ffed66551')
def test_qos_policy_assoc_ipv6_subnet_static_ip(self):
"""Create network with IPv6 static assignment based subnet
Associate qos policy to the network
"""
network, _ = self._create_single_ipv6_rtr_topology()
policy = self. _create_qos_policy()
self.cmgr_adm.networks_client.update_network(
network['id'], qos_policy_id=policy['id'])
updated_network = self.cmgr_adm.networks_client.show_network(
network['id'])
self.assertEqual(updated_network['network']['qos_policy_id'],
policy['id'])
@decorators.attr(type=['nsxv3', 'positive'])
@decorators.idempotent_id('a121d38e-d977-4bd0-b7db-6f7d9e206f7a')
def test_qos_policy_assoc_ipv6_subnet_slaac_ip(self):
"""Create network with IPv6 SLAAC subnet
Associate qos policy to the network
"""
policy = self. _create_qos_policy()
network, _ = self._create_single_ipv6_rtr_topology(slaac=True)
self.cmgr_adm.networks_client.update_network(
network['id'], qos_policy_id=policy['id'])
updated_network = self.cmgr_adm.networks_client.show_network(
network['id'])
self.assertEqual(updated_network['network']['qos_policy_id'],
policy['id'])
@decorators.attr(type=['nsxv3', 'positive'])
@decorators.idempotent_id('7a6a2088-612f-4fc6-87be-d58c5d04b946')
def test_qos_policy_assoc_ipv6_port_static_ip(self):
"""Create network with IPv6 static assignment based subnet
Associate qos policy to a port attached to the ipv6
subnet
"""
network, _ = self._create_single_ipv6_rtr_topology()
policy = self. _create_qos_policy()
port_client = self.cmgr_adm.ports_client
body = self.create_topology_port(network=network,
ports_client=port_client, qos_policy_id=policy['id'])
port = body['port']
body = self.show_topology_port(port['id'],
ports_client=port_client)
port = body['port']
self.assertEqual(port['qos_policy_id'],
policy['id'])
@decorators.attr(type=['nsxv3', 'positive'])
@decorators.idempotent_id('f6297a98-f487-4403-852b-61fd0fce329c')
def test_qos_policy_assoc_ipv6_port_slaac_ip(self):
"""Create network with IPv6 SLAAC subnet
Associate qos policy to a port attached
to the ipv6 subnet
"""
policy = self. _create_qos_policy()
network, _ = self._create_single_ipv6_rtr_topology(slaac=True)
port_client = self.cmgr_adm.ports_client
self.create_topology_port(network=network,
ports_client=port_client)
self.cmgr_adm.networks_client.update_network(
network['id'], qos_policy_id=policy['id'])
updated_network = self.cmgr_adm.networks_client.show_network(
network['id'])
self.assertEqual(updated_network['network']['qos_policy_id'],
policy['id'])