diff --git a/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_ipv6_nsx_mac_learning.py b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_ipv6_nsx_mac_learning.py new file mode 100644 index 0000000..090b064 --- /dev/null +++ b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_ipv6_nsx_mac_learning.py @@ -0,0 +1,288 @@ +# Copyright 2019 VMware Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_log import log as logging + +from tempest import config +from tempest.lib.common.utils import data_utils +from tempest.lib import decorators +from tempest.lib import exceptions + +from vmware_nsx_tempest_plugin.common import constants +from vmware_nsx_tempest_plugin.lib import feature_manager +from vmware_nsx_tempest_plugin.services import nsxv3_client + +import time + +CONF = config.CONF +CONF.validation.auth_method = 'None' + +LOG = logging.getLogger(__name__) + + +class TestMacLearning(feature_manager.FeatureManager): + + @classmethod + def skip_checks(cls): + super(TestMacLearning, cls).skip_checks() + if not (CONF.network.project_networks_reachable or + CONF.network.public_network_id): + msg = ('Either project_networks_reachable must be "true", or ' + 'public_network_id must be defined.') + raise cls.skipException(msg) + if not CONF.network.public_network_cidr: + msg = "public_network_cidr must be defined in network section." + raise cls.skipException(msg) + if not (CONF.network_feature_enabled.ipv6 and + CONF.network_feature_enabled.ipv6_subnet_attributes): + raise cls.skipException('IPv6 or its attributes not supported') + + @classmethod + def setup_clients(cls): + super(TestMacLearning, cls).setup_clients() + cls.cmgr_adm = cls.get_client_manager('admin') + + @classmethod + def resource_setup(cls): + super(TestMacLearning, cls).resource_setup() + cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, + CONF.nsxv3.nsx_user, + CONF.nsxv3.nsx_password) + + def _create_ipv6_subnet(self, network, router, slaac=False): + subnet_client = self.cmgr_adm.subnets_client + subnet_name = network['name'] + 'ipv6-sub' + address_cidr = CONF.network.project_network_v6_cidr + address_prefixlen = CONF.network.project_network_v6_mask_bits + if ((address_prefixlen >= 126)): + msg = ("Subnet %s isn't large enough for the test" % address_cidr) + raise exceptions.InvalidConfiguration(msg) + allocation_pools = {'allocation_pools': [{ + 'start': str(address_cidr).split('/')[0] + '2', + 'end':str(address_cidr).split('/')[0] + '70'}]} + if slaac: + subnet = self.create_topology_subnet(subnet_name, network, + subnets_client=subnet_client, + routers_client=self.cmgr_adm.routers_client, + router_id=router['id'], + ip_version=6, ipv6_ra_mode='slaac', + ipv6_address_mode='slaac', + **allocation_pools) + else: + subnet = self.create_topology_subnet(subnet_name, network, + subnets_client=subnet_client, + ip_version=6, enable_dhcp=False, + **allocation_pools) + return subnet + + def _create_mac_learn_enabled_port(self, slaac=False): + # Create Port with required port security/sec groups config + rtr_name = data_utils.rand_name("mac-router-") + router = self.create_topology_router( + rtr_name, routers_client=self.cmgr_adm.routers_client) + name = data_utils.rand_name("qos-network") + networks_client = self.cmgr_adm.networks_client + network = self.create_topology_network(name, + networks_client=networks_client) + self._create_ipv6_subnet(network, router, slaac) + test_port_name = data_utils.rand_name('port-') + port_client = self.cmgr_adm.ports_client + body = self.create_topology_port(network=network, + ports_client=port_client, mac_learning_enabled=True, + port_security_enabled=False, security_groups=[], + name=test_port_name) + return body['port'] + + def _update_port_disable_mac_learning(self, port, sec_groups=None): + if sec_groups is None: + sec_groups = [] + updated_sec_grp = port['security_groups'] + sec_groups + body = self.update_topology_port(port['id'], + ports_client=self.cmgr_adm.ports_client, + mac_learning_enabled=False, + port_security_enabled=True, + security_groups=updated_sec_grp) + return body['port'] + + def _update_port_enable_mac_learning(self, port): + body = self.update_topology_port(port['id'], + ports_client=self.cmgr_adm.ports_client, + mac_learning_enabled=True, + port_security_enabled=False, + security_groups=[]) + return body['port'] + + def _conv_switch_prof_to_dict(self, switch_profiles): + switch_prof_dict = {} + for i in range(len(switch_profiles)): + switch_prof_dict.update( + {switch_profiles[i]['key']: switch_profiles[i]['value']}) + return switch_prof_dict + + def _get_nsx_mac_learning_enabled(self, port): + mac_learn_set_bool = False + # Get nsxv3 port(expects 'name' set) + nsx_port = self.nsx.get_logical_port(port['name']) + # Get list of logical port's switch profiles + port_swtch_profs = nsx_port['switching_profile_ids'] + # Convert switch profiles list to dict, key:UUID + port_sw_prof_dict = self._conv_switch_prof_to_dict(port_swtch_profs) + # Get MAC learning switch profile ID + mac_sw_prof_id = port_sw_prof_dict[constants.MAC_SW_PROFILE] + # Get MAC learning switch profile json + mac_sw_profile_json = self.nsx.get_switching_profile(mac_sw_prof_id) + # Get mac-learning state for port + if ('mac_learning' in mac_sw_profile_json): + nsxport_mac_learning = mac_sw_profile_json[ + 'mac_learning']['enabled'] + if nsxport_mac_learning: + mac_learn_set_bool = True + return mac_learn_set_bool + + def _check_mac_learning(self, port, mac_learn_state=True): + # Enabling MAC Learning requires port security=False and no sec grps + nsxport_mac_learning = self._get_nsx_mac_learning_enabled(port) + if mac_learn_state: + self.assertEmpty(port['security_groups'], + "Sec grp for mac learn port is not empty") + self.assertFalse(port['port_security_enabled'], + "Port security is enabled") + self.assertTrue(port['mac_learning_enabled'], + "Mac Learning is not enabled") + self.assertEqual(nsxport_mac_learning, + port['mac_learning_enabled'], + "OS and NSX mac learn states don't match") + else: + self.assertTrue(port['port_security_enabled'], + "Port security is disabled") + if 'mac_learning_enabled' in port.keys(): + self.assertFalse(port['mac_learning_enabled'], + "Mac Learning is enabled") + self.assertEqual(nsxport_mac_learning, + port['mac_learning_enabled'], + "OS and NSX mac learn states don't match") + + @decorators.attr(type='nsxv3') + @decorators.idempotent_id('811e18ab-5767-4185-8f5d-663ff19aa335') + def test_create_update_delete_mac_learning_port_static_ipv6(self): + """ + CRUD Workflow 3 + Create port with MAC Learning enabled + Update port(non-MAC Learning settings) + Delete port + """ + test_port = self._create_mac_learn_enabled_port() + new_port_name = data_utils.rand_name('updated_port-') + body = self.update_topology_port(test_port['id'], + ports_client=self.cmgr_adm.ports_client, + name=new_port_name) + updated_port = body['port'] + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + updated_nsx_port = self.nsx.get_logical_port(updated_port['name']) + self.assertEqual(updated_nsx_port['display_name'], + updated_port['name'], + "Updated port names do not match OS and NSX") + self.delete_topology_port(test_port['id'], + ports_client=self.cmgr_adm.ports_client) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self.assertIsNone(self.nsx.get_logical_port(updated_port['name']), + "Logical port %s is not None" % updated_port['name']) + + @decorators.attr(type='nsxv3') + @decorators.idempotent_id('cf0791c9-88c7-461c-9d86-429cac2cddc8') + def test_create_update_delete_mac_learning_port_slaac_ipv6(self): + """ + CRUD Workflow 3 + Create port with MAC Learning enabled + Update port(non-MAC Learning settings) + Delete port + """ + test_port = self._create_mac_learn_enabled_port(slaac=True) + new_port_name = data_utils.rand_name('updated_port-') + body = self.update_topology_port(test_port['id'], + ports_client=self.cmgr_adm.ports_client, + name=new_port_name) + updated_port = body['port'] + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + updated_nsx_port = self.nsx.get_logical_port(updated_port['name']) + self.assertEqual(updated_nsx_port['display_name'], + updated_port['name'], + "Updated port names do not match OS and NSX") + self.delete_topology_port(test_port['id'], + ports_client=self.cmgr_adm.ports_client) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self.assertIsNone(self.nsx.get_logical_port(updated_port['name']), + "Logical port %s is not None" % updated_port['name']) + + @decorators.attr(type='nsxv3') + @decorators.idempotent_id('e1af7c44-f859-4d88-b0eb-7c863f149868') + def test_toggle_mac_learning_port_static_ipv6_delete(self): + """ + CRUD Workflow 2 + Create port with MAC Learning enabled + Update port, disabling MAC Learning + Update port, re-enabling MAC Learning + Delete port + """ + test_port = self._create_mac_learn_enabled_port() + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self._check_mac_learning(test_port, mac_learn_state=True) + ml_off_port = self._update_port_disable_mac_learning(test_port) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self._check_mac_learning(ml_off_port, mac_learn_state=False) + ml_on_port = self._update_port_enable_mac_learning(ml_off_port) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self._check_mac_learning(ml_on_port, mac_learn_state=True) + self.delete_topology_port(ml_on_port['id'], + ports_client=self.cmgr_adm.ports_client) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self.assertIsNone(self.nsx.get_logical_port(ml_on_port['name']), + "Port %s is not None" % ml_on_port['name']) + + @decorators.attr(type='nsxv3') + @decorators.idempotent_id('8f822bc6-9801-4f6f-acda-6dec3a7bb443') + def test_toggle_mac_learning_port_slaac_ipv6_delete(self): + """ + CRUD Workflow 2 + Create port with MAC Learning enabled + Update port, disabling MAC Learning + Update port, re-enabling MAC Learning + Delete port + """ + test_port = self._create_mac_learn_enabled_port(slaac=True) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self._check_mac_learning(test_port, mac_learn_state=True) + ml_off_port = self._update_port_disable_mac_learning(test_port) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self._check_mac_learning(ml_off_port, mac_learn_state=False) + ml_on_port = self._update_port_enable_mac_learning(ml_off_port) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self._check_mac_learning(ml_on_port, mac_learn_state=True) + self.delete_topology_port(ml_on_port['id'], + ports_client=self.cmgr_adm.ports_client) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self.assertIsNone(self.nsx.get_logical_port(ml_on_port['name']), + "Port %s is not None" % ml_on_port['name']) diff --git a/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_ipv6_qos.py b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_ipv6_qos.py new file mode 100644 index 0000000..b5beec4 --- /dev/null +++ b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_ipv6_qos.py @@ -0,0 +1,192 @@ +# Copyright 2019 VMware Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_log import log as logging + +from tempest import config +from tempest.lib.common.utils import data_utils +from tempest.lib import decorators +from tempest.lib import exceptions +from tempest import test + +from vmware_nsx_tempest_plugin.lib import feature_manager + + +CONF = config.CONF +CONF.validation.auth_method = 'None' + +LOG = logging.getLogger(__name__) + +DSCP_MARK = 12 +DSCP_MARK_UPDATED = 16 +BW_VALUE_KBPS = 1024 +BW_VALUE_MBPS = 1 +UPDATED_BW_VALUE_KBPS = 2048 +UPDATED_BW_VALUE_MBPS = 2 +MAX_BURST_KBPS = 1024000 +MAX_BURST_MBPS = 1 + + +class TestQos(feature_manager.FeatureManager): + + @classmethod + def skip_checks(cls): + super(TestQos, cls).skip_checks() + if not (CONF.network.project_networks_reachable or + CONF.network.public_network_id): + msg = ('Either project_networks_reachable must be "true", or ' + 'public_network_id must be defined.') + raise cls.skipException(msg) + if not CONF.network.public_network_cidr: + msg = "public_network_cidr must be defined in network section." + raise cls.skipException(msg) + if not test.is_extension_enabled('qos', 'network'): + msg = "q-qos extension not enabled." + raise cls.skipException(msg) + if not (CONF.network_feature_enabled.ipv6 and + CONF.network_feature_enabled.ipv6_subnet_attributes): + raise cls.skipException('IPv6 or its attributes not supported') + + @classmethod + def setup_clients(cls): + super(TestQos, cls).setup_clients() + cls.cmgr_adm = cls.get_client_manager('admin') + + @classmethod + def resource_setup(cls): + super(TestQos, cls).resource_setup() + + def _create_ipv6_subnet(self, network, router, slaac=False): + subnet_client = self.cmgr_adm.subnets_client + subnet_name = network['name'] + 'ipv6-sub' + address_cidr = CONF.network.project_network_v6_cidr + address_prefixlen = CONF.network.project_network_v6_mask_bits + if ((address_prefixlen >= 126)): + msg = ("Subnet %s isn't large enough for the test" % address_cidr) + raise exceptions.InvalidConfiguration(msg) + allocation_pools = {'allocation_pools': [{ + 'start': str(address_cidr).split('/')[0] + '2', + 'end':str(address_cidr).split('/')[0] + '70'}]} + if slaac: + subnet = self.create_topology_subnet(subnet_name, network, + subnets_client=subnet_client, + routers_client=self.cmgr_adm.routers_client, + router_id=router['id'], + ip_version=6, ipv6_ra_mode='slaac', + ipv6_address_mode='slaac', + **allocation_pools) + else: + subnet = self.create_topology_subnet(subnet_name, network, + subnets_client=subnet_client, + ip_version=6, enable_dhcp=False, + **allocation_pools) + return subnet + + def _create_single_ipv6_rtr_topology(self, slaac=False): + """Create IPv6 subnet and attach them to a router + """ + rtr_name = data_utils.rand_name("qos-rtr") + router = self.create_topology_router( + rtr_name, routers_client=self.cmgr_adm.routers_client) + name = data_utils.rand_name("qos-network") + networks_client = self.cmgr_adm.networks_client + network = self.create_topology_network(name, + networks_client=networks_client) + self._create_ipv6_subnet(network, router, slaac) + return network, router + + def _create_qos_policy(self): + """Create a qos policy with bandwidth limit rule + and dscp rule + """ + name = data_utils.rand_name('test-qos-policy-') + policy = self.create_qos_policy(name, + description='bw_dscp_rule', + shared=False) + # add bw rule + self.create_bandwidth_limit_rule( + policy_id=policy['id'], max_kbps=BW_VALUE_KBPS, + max_burst_kbps=MAX_BURST_KBPS) + # add dscp rule + self.create_dscp_marking_rule( + policy_id=policy['id'], dscp_mark=DSCP_MARK) + return policy + + @decorators.attr(type=['nsxv3', 'positive']) + @decorators.idempotent_id('b46d699f-9bd5-4430-a816-167ffed66551') + def test_qos_policy_assoc_ipv6_subnet_static_ip(self): + """Create network with IPv6 static assignment based subnet + Associate qos policy to the network + """ + network, _ = self._create_single_ipv6_rtr_topology() + policy = self. _create_qos_policy() + self.cmgr_adm.networks_client.update_network( + network['id'], qos_policy_id=policy['id']) + updated_network = self.cmgr_adm.networks_client.show_network( + network['id']) + self.assertEqual(updated_network['network']['qos_policy_id'], + policy['id']) + + @decorators.attr(type=['nsxv3', 'positive']) + @decorators.idempotent_id('a121d38e-d977-4bd0-b7db-6f7d9e206f7a') + def test_qos_policy_assoc_ipv6_subnet_slaac_ip(self): + """Create network with IPv6 SLAAC subnet + Associate qos policy to the network + """ + policy = self. _create_qos_policy() + network, _ = self._create_single_ipv6_rtr_topology(slaac=True) + self.cmgr_adm.networks_client.update_network( + network['id'], qos_policy_id=policy['id']) + updated_network = self.cmgr_adm.networks_client.show_network( + network['id']) + self.assertEqual(updated_network['network']['qos_policy_id'], + policy['id']) + + @decorators.attr(type=['nsxv3', 'positive']) + @decorators.idempotent_id('7a6a2088-612f-4fc6-87be-d58c5d04b946') + def test_qos_policy_assoc_ipv6_port_static_ip(self): + """Create network with IPv6 static assignment based subnet + Associate qos policy to a port attached to the ipv6 + subnet + """ + network, _ = self._create_single_ipv6_rtr_topology() + policy = self. _create_qos_policy() + port_client = self.cmgr_adm.ports_client + body = self.create_topology_port(network=network, + ports_client=port_client, qos_policy_id=policy['id']) + port = body['port'] + body = self.show_topology_port(port['id'], + ports_client=port_client) + port = body['port'] + self.assertEqual(port['qos_policy_id'], + policy['id']) + + @decorators.attr(type=['nsxv3', 'positive']) + @decorators.idempotent_id('f6297a98-f487-4403-852b-61fd0fce329c') + def test_qos_policy_assoc_ipv6_port_slaac_ip(self): + """Create network with IPv6 SLAAC subnet + Associate qos policy to a port attached + to the ipv6 subnet + """ + policy = self. _create_qos_policy() + network, _ = self._create_single_ipv6_rtr_topology(slaac=True) + port_client = self.cmgr_adm.ports_client + self.create_topology_port(network=network, + ports_client=port_client) + self.cmgr_adm.networks_client.update_network( + network['id'], qos_policy_id=policy['id']) + updated_network = self.cmgr_adm.networks_client.show_network( + network['id']) + self.assertEqual(updated_network['network']['qos_policy_id'], + policy['id'])