diff --git a/vmware_nsx_tempest/common/constants.py b/vmware_nsx_tempest/common/constants.py index bdc4f86268..1dff7aec13 100644 --- a/vmware_nsx_tempest/common/constants.py +++ b/vmware_nsx_tempest/common/constants.py @@ -36,3 +36,8 @@ MD_ERROR_CODE_WHEN_LS_BOUNDED = "10026" INTERVAL_BETWEEN_EXEC_RETRY_ON_SSH = 5 MAX_NO_OF_TIMES_EXECUTION_OVER_SSH = 30 MD_BASE_URL = "http://169.254.169.254/" + +# NSXV3 Port Security constants. +NSX_BACKEND_TIME_INTERVAL = 30 +NSX_BACKEND_SMALL_TIME_INTERVAL = 10 +NSX_BACKEND_VERY_SMALL_TIME_INTERVAL = 5 diff --git a/vmware_nsx_tempest/services/nsxv3_client.py b/vmware_nsx_tempest/services/nsxv3_client.py index 4fdbe79756..12ef1f52db 100644 --- a/vmware_nsx_tempest/services/nsxv3_client.py +++ b/vmware_nsx_tempest/services/nsxv3_client.py @@ -349,6 +349,25 @@ class NSXV3Client(object): """ return self.get_logical_resources("/ns-groups") + def get_neutron_ns_group_id(self): + """ + Retrieve NSGroup Id + """ + nsx_nsgroup = self.get_ns_groups() + for group in nsx_nsgroup: + if group['display_name'] == 'neutron_excluded_port_nsgroup': + nsgroup_id = group['id'] + return nsgroup_id + + def get_ns_group_port_members(self, ns_group_id): + """ + Retrieve NSGroup port members + """ + endpoint = "/ns-groups/%s/effective-logical-port-members" % ns_group_id + response = self.get(endpoint=endpoint) + res_json = response.json() + return res_json + def get_ns_group(self, os_name, os_uuid): """ Get the NSGroup based on the name provided. diff --git a/vmware_nsx_tempest/tests/nsxv3/api/test_nsx_port_security.py b/vmware_nsx_tempest/tests/nsxv3/api/test_nsx_port_security.py new file mode 100644 index 0000000000..af9849ebec --- /dev/null +++ b/vmware_nsx_tempest/tests/nsxv3/api/test_nsx_port_security.py @@ -0,0 +1,276 @@ +# Copyright 2017 VMware Inc +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from tempest.api.network import base +from tempest import config +from tempest.lib.common.utils import test_utils +from tempest.lib import decorators +from tempest.lib import exceptions + +from tempest import test +from vmware_nsx_tempest.common import constants +from vmware_nsx_tempest.services import nsxv3_client + +CONF = config.CONF + + +class NSXv3PortSecurity(base.BaseAdminNetworkTest): + """Test Port security of Port + + 1. Create/Update port with port security enable and disable. + 2. Create/Update port security and check at beckend NSGroup. + 3. Check non admin tenant shouldn't update port security of admin port. + 4. Check non-admin tenant can't delete port security of admin port. + """ + + @classmethod + def skip_checks(cls): + super(NSXv3PortSecurity, cls).skip_checks() + if not test.is_extension_enabled('port-security-enabled', 'network'): + msg = "Extension port-security-enabled is not enabled." + raise cls.skipException(msg) + + @classmethod + def setup_clients(cls): + super(NSXv3PortSecurity, cls).setup_clients() + cls.cmgr_pri = cls.get_client_manager('primary') + cls.cmgr_alt = cls.get_client_manager('alt') + cls.cmgr_adm = cls.get_client_manager('admin') + + @classmethod + def resource_setup(cls): + super(NSXv3PortSecurity, cls).resource_setup() + cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, + CONF.nsxv3.nsx_user, + CONF.nsxv3.nsx_password) + cls.network = cls.create_network() + + def get_tag_port_id(self, nsxgroup_data, org_port_id): + """ + Method to get port of neutron corresponding to backend port-id + """ + for ports in self.nsx.get_logical_ports(): + for port_id in nsxgroup_data['results']: + if ports['display_name'] == port_id['target_display_name']: + for tag in ports['tags']: + if tag['scope'] == "os-neutron-port-id" and \ + tag['tag'] == org_port_id: + corresponding_port_id = ports['display_name'] + return corresponding_port_id + + def check_port_not_exists_in_os_group(self, nsxgroup_data, + corresponding_port_id): + """ + Method to check neutron port not exists in neutron OSGroup + """ + if nsxgroup_data['results'] != []: + for port_id in nsxgroup_data['results']: + if corresponding_port_id != port_id['target_display_name']: + continue + else: + return False + return True + else: + return False + + def check_port_exists_in_os_group(self, nsxgroup_data, + corresponding_port_id): + """ + Method to check neutron port exists in neutron OSGroup + """ + for port_id in nsxgroup_data['results']: + if corresponding_port_id == port_id['target_display_name']: + return True + + def _create_network_topo(self, client): + """ + Method to create network topology which includes network, subnet + and port + """ + net_client = client.networks_client + body = {'name': 'port-security-network'} + network = net_client.create_network(**body) + self.addCleanup(test_utils.call_and_ignore_notfound_exc, + net_client.delete_network, + network['network']['id']) + body = {"network_id": network['network']['id'], + "allocation_pools": [{"start": "2.0.0.2", + "end": "2.0.0.254"}], + "ip_version": 4, "cidr": "2.0.0.0/24"} + subnet_client = client.subnets_client + subnet = subnet_client.create_subnet(**body) + body = {"network_id": network['network']['id'], + "admin_state_up": "true", + "port_security_enabled": "false", "security_groups": []} + port_client = client.ports_client + port = port_client.create_port(**body) + self.addCleanup(test_utils.call_and_ignore_notfound_exc, + port_client.delete_port, + port['port']['id']) + network_topo = dict(network=network, subnet=subnet, port=port) + return network_topo + + def _create_security_group_and_return_id(self, client): + """ + Method to create security group and return id + """ + security_client = client.security_groups_client + create_body = security_client.create_security_group(name='sec-group') + secgroup = create_body['security_group'] + # Sleep for 5 sec + time.sleep(constants.NSX_BACKEND_VERY_SMALL_TIME_INTERVAL) + secgroup_id = secgroup['id'] + return secgroup_id + + @test.attr(type='nsxv3') + @decorators.idempotent_id('50203701-1cda-4f31-806d-7a51514b9664') + def test_create_port_with_security_enabled_check_in_neutron_database(self): + secgroup_id = self._create_security_group_and_return_id(self.cmgr_adm) + network_topo = self._create_network_topo(self.cmgr_adm) + port_client = self.cmgr_adm.ports_client + port_id = network_topo['port']['port']['id'] + port_detail = port_client.show_port(port_id) + self.assertEqual(False, port_detail['port']["port_security_enabled"]) + body = {"port_security_enabled": "true", + "security_groups": [secgroup_id]} + port_client.update_port(port_id, **body) + # Sleep for 10 sec + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + port_detail = port_client.show_port(port_id) + self.assertEqual(True, port_detail['port']["port_security_enabled"]) + + @test.attr(type='nsxv3') + @decorators.idempotent_id('4b765fa2-345f-4d2c-928f-ad4b347936fd') + def test_create_port_with_security_enabled_check_at_beckend(self): + net_client = self.cmgr_adm.networks_client + body = {'name': 'port-security-network'} + network = net_client.create_network(**body) + self.addCleanup(test_utils.call_and_ignore_notfound_exc, + net_client.delete_network, + network['network']['id']) + body = {"network_id": network['network']['id'], + "allocation_pools": [{"start": "2.0.0.2", + "end": "2.0.0.254"}], + "ip_version": 4, "cidr": "2.0.0.0/24"} + subnet_client = self.cmgr_adm.subnets_client + subnet_client.create_subnet(**body) + body = {"network_id": network['network']['id'], + "admin_state_up": "true", + "port_security_enabled": "false", "security_groups": []} + port_client = self.cmgr_adm.ports_client + port_id = port_client.create_port(**body) + # Sleep for 10 sec + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + self.addCleanup(test_utils.call_and_ignore_notfound_exc, + port_client.delete_port, + port_id['port']['id']) + port_detail = port_client.show_port(port_id['port']['id']) + self.assertEqual(False, port_detail['port']["port_security_enabled"]) + org_port_id = port_id['port']['id'] + nsgroup_id = self.nsx.get_neutron_ns_group_id() + nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) + # Sleep for 10 sec + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + corresponding_port_id = self.get_tag_port_id(nsxgroup_data, + org_port_id) + status = self.check_port_exists_in_os_group(nsxgroup_data, + corresponding_port_id) + self.assertEqual(True, status) + + @test.attr(type='nsxv3') + @decorators.idempotent_id('bcafeb10-fbf1-4c97-9e4f-50e56d32bdcf') + def test_non_admin_cannot_update_admin_port_with_security(self): + network_topo = self._create_network_topo(self.cmgr_adm) + tenant_port_client = self.cmgr_alt.ports_client + kwargs = {"port_security_enabled": "true"} + self.assertRaises(exceptions.NotFound, + tenant_port_client.update_port, + network_topo['port']['port']['id'], + **kwargs) + + @test.attr(type='nsxv3') + @decorators.idempotent_id('cf4b8d28-13c4-4339-993f-66070708e914') + def test_non_admin_cannot_delete_tenant_port_with_port_security(self): + network_topo = self._create_network_topo(self.cmgr_pri) + tenant_port_client = self.cmgr_alt.ports_client + self.assertRaises(exceptions.NotFound, + tenant_port_client.delete_port, + network_topo['port']['port']['id']) + + @test.attr(type='nsxv3') + @decorators.idempotent_id('ee6213ac-dfcd-401b-bbc6-03afd26f203a') + def test_tenant_port_security_at_beckend_after_enable_disable(self): + secgroup_id = self._create_security_group_and_return_id(self.cmgr_alt) + network_topo = self._create_network_topo(self.cmgr_alt) + port_client = self.cmgr_alt.ports_client + kwargs = {"port_security_enabled": "false", "security_groups": []} + org_port_id = network_topo['port']['port']['id'] + port_client.update_port(org_port_id, + **kwargs) + # Sleep for 10 sec + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + nsgroup_id = self.nsx.get_neutron_ns_group_id() + nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) + corresponding_port_id = self.get_tag_port_id(nsxgroup_data, + org_port_id) + # Sleep for 10 sec + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + status = self.check_port_exists_in_os_group(nsxgroup_data, + corresponding_port_id) + self.assertEqual(True, status) + kwargs = {"port_security_enabled": "true", + "security_groups": [secgroup_id]} + port_client.update_port(org_port_id, + **kwargs) + time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) + nsgroup_id = self.nsx.get_neutron_ns_group_id() + nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) + status = self.check_port_not_exists_in_os_group(nsxgroup_data, + corresponding_port_id) + self.assertEqual(False, status) + + @test.attr(type='nsxv3') + @decorators.idempotent_id('c6f4c2f2-3fc9-4983-a05a-bb3a3dc35ad8') + def test_admin_port_security_at_beckend_after_enable_disable(self): + secgroup_id = self._create_security_group_and_return_id(self.cmgr_adm) + network_topo = self._create_network_topo(self.cmgr_adm) + port_client = self.cmgr_adm.ports_client + kwargs = {"port_security_enabled": "false", + "security_groups": []} + org_port_id = network_topo['port']['port']['id'] + port_client.update_port(org_port_id, + **kwargs) + # Sleep for 10 sec + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + nsgroup_id = self.nsx.get_neutron_ns_group_id() + nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) + corresponding_port_id = self.get_tag_port_id(nsxgroup_data, + org_port_id) + # Sleep for 10 sec + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + status = self.check_port_exists_in_os_group(nsxgroup_data, + corresponding_port_id) + self.assertEqual(True, status) + kwargs = {"port_security_enabled": "true", + "security_groups": [secgroup_id]} + port_client.update_port(org_port_id, **kwargs) + time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) + nsgroup_id = self.nsx.get_neutron_ns_group_id() + nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) + status = self.check_port_not_exists_in_os_group(nsxgroup_data, + corresponding_port_id) + self.assertEqual(False, status)