From f530d93864c3637973061b9e15b4bb682e67f952 Mon Sep 17 00:00:00 2001 From: Puneet Arora Date: Tue, 19 Jul 2016 19:17:49 +0000 Subject: [PATCH] Tempest: Providing support for disable spoofguard feature Added scenario testcase for disable spoofguard feature. Change-Id: Ifd741e7fbc4a5fa6a577864bdf98eaad9082b876 --- vmware_nsx_tempest/services/nsxv_client.py | 19 + .../nsxv/scenario/network_addon_methods.py | 13 + .../nsxv/scenario/test_spoofguard_policy.py | 348 ++++++++++++++++++ 3 files changed, 380 insertions(+) create mode 100644 vmware_nsx_tempest/tests/nsxv/scenario/test_spoofguard_policy.py diff --git a/vmware_nsx_tempest/services/nsxv_client.py b/vmware_nsx_tempest/services/nsxv_client.py index 7010d2e921..eec651aa9b 100644 --- a/vmware_nsx_tempest/services/nsxv_client.py +++ b/vmware_nsx_tempest/services/nsxv_client.py @@ -260,6 +260,25 @@ class VSMClient(object): response = self.get() return response + def get_excluded_vm_name_list(self): + """Get excluded vm's list info from beckend. + + After disabling port security of vm port, vm will get added + in exclude list.This method returns the list of vm's present + in exclude list. + Returns exclude list of vm's name. + """ + self.__set_api_version('2.1') + self.__set_endpoint('/app/excludelist') + response = self.get() + response_list = [] + exclude_list = [] + response_list = response.json()[ + 'excludeListConfigurationDto']['excludeMembers'] + exclude_list = [member['member']['name'] for member in response_list + if member['member']['name']] + return exclude_list + def get_dhcp_edge_info(self): """Get dhcp edge info. diff --git a/vmware_nsx_tempest/tests/nsxv/scenario/network_addon_methods.py b/vmware_nsx_tempest/tests/nsxv/scenario/network_addon_methods.py index 09ee133e13..39403ac7e6 100644 --- a/vmware_nsx_tempest/tests/nsxv/scenario/network_addon_methods.py +++ b/vmware_nsx_tempest/tests/nsxv/scenario/network_addon_methods.py @@ -178,6 +178,19 @@ def create_network(SELF, client=None, tenant_id=None, name=None, **kwargs): return net_network +def create_port(SELF, client=None, **kwargs): + if not client: + client = SELF.port_client + result = client.create_port(**kwargs) + net_port = result['port'] + SELF.assertIsNotNone(result, 'Unable to allocate port') + SELF.addCleanup(test_utils.call_and_ignore_notfound_exc, + client.delete_port, + net_port['id']) + + return net_port + + # gateway=None means don't set gateway_ip in subnet def create_subnet(SELF, network, client=None, gateway='', cidr=None, mask_bits=None, diff --git a/vmware_nsx_tempest/tests/nsxv/scenario/test_spoofguard_policy.py b/vmware_nsx_tempest/tests/nsxv/scenario/test_spoofguard_policy.py new file mode 100644 index 0000000000..846586e09b --- /dev/null +++ b/vmware_nsx_tempest/tests/nsxv/scenario/test_spoofguard_policy.py @@ -0,0 +1,348 @@ +# Copyright 2016 OpenStack Foundation +# Copyright 2016 VMware Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + +from tempest import config +from tempest.lib.common.utils import data_utils +from tempest import test + +from vmware_nsx_tempest._i18n import _LI +from vmware_nsx_tempest.services import nsxv_client +from vmware_nsx_tempest.tests.nsxv.scenario import ( + manager_topo_deployment as dmgr) +from vmware_nsx_tempest.tests.nsxv.scenario import ( + network_addon_methods as HELO) + +CONF = config.CONF +LOG = dmgr.manager.log.getLogger(__name__) + + +class TestSpoofGuardBasicOps(dmgr.TopoDeployScenarioManager): + """Base class provides Spoof Guard basic operations. + + 1) Create network, subnet and port + 2) Boot an instance using network. + 2) Ssh to instance and then check below information: + a) check exclude list whether vm exists in exclude list or not + b) update port-security to disable and check vm exists in exclude list + or not + c) Launch multiple instances anc checks their existence in exclude list + with port-security disabled/enabled. + d) Launch instances and check ping packets between various vm's with + port-security disabled/enabled. + e) Enabled/disablling of network and check behavior w.r.t. port in that + network. + 3) Check at beckend(nsx-v) for exclude list. + """ + + @classmethod + def skip_checks(cls): + super(TestSpoofGuardBasicOps, cls).skip_checks() + manager_ip = re.search(r"(\d{1,3}\.){3}\d{1,3}", + CONF.nsxv.manager_uri).group(0) + cls.vsm = nsxv_client.VSMClient( + manager_ip, CONF.nsxv.user, CONF.nsxv.password) + nsxv_version = cls.vsm.get_vsm_version() + # Raise skip testcase exception if nsx-v version is less than 6.2.3 + if (nsxv_version and nsxv_version < '6.2.3'): + msg = ('NSX-v version should be greater than or equal to 6.2.3') + raise cls.skipException(msg) + + @classmethod + def resource_setup(cls): + super(TestSpoofGuardBasicOps, cls).resource_setup() + + @classmethod + def resource_cleanup(cls): + super(TestSpoofGuardBasicOps, cls).resource_cleanup() + + def tearDown(self): + self.remove_project_network() + super(TestSpoofGuardBasicOps, self).tearDown() + + def remove_project_network(self): + project_name = 'green' + tenant = getattr(self, project_name, None) + if tenant: + servers_client = tenant['client_mgr'].servers_client + dmgr.delete_all_servers(servers_client) + self.disassociate_floatingip(tenant['fip1'], + and_delete=True) + + def create_project_network_subnet(self, + name_prefix='spoofguard-project'): + network_name = data_utils.rand_name(name_prefix) + network, subnet = self.create_network_subnet( + name=network_name) + return (network['id'], network, subnet) + + def check_server_connected(self, serv): + # Fetch tenant-network from where vm deployed + serv_net = list(serv['addresses'].keys())[0] + serv_addr = serv['addresses'][serv_net][0] + host_ip = serv_addr['addr'] + self.waitfor_host_connected(host_ip) + + def setup_vm_enviornment(self, client_mgr, t_id, + check_outside_world=True, + cidr_offset=0): + t_network, t_subnet, t_router = self.setup_project_network( + self.public_network_id, namestart=("deploy-%s-spoofuard" % t_id), + cidr_offset=0) + t_security_group = self._create_security_group( + security_groups_client=self.security_groups_client, + security_group_rules_client=self.security_group_rules_client, + namestart='adm') + username, password = self.get_image_userpass() + security_groups = [{'name': t_security_group['id']}] + t_serv1 = self.create_server_on_network( + t_network, security_groups, + image=self.get_server_image(), + flavor=self.get_server_flavor(), + name=t_network['name']) + self.check_server_connected(t_serv1) + t_floatingip = self.create_floatingip_for_server( + t_serv1, client_mgr=self.admin_manager) + msg = ("Associate t_floatingip[%s] to server[%s]" + % (t_floatingip, t_serv1['name'])) + self._check_floatingip_connectivity( + t_floatingip, t_serv1, should_connect=True, msg=msg) + vm_enviornment = dict(security_group=t_security_group, + network=t_network, subnet=t_subnet, + router=t_router, client_mgr=client_mgr, + serv1=t_serv1, fip1=t_floatingip) + return vm_enviornment + + def get_port_id(self, port_client, vm_info): + tenant_name = vm_info['name'] + fixed_ip = vm_info['addresses'][tenant_name][0]['addr'] + list_ports = port_client.list_ports() + list_ports_extract = list_ports['ports'] + for port in list_ports_extract: + if port['fixed_ips'][0]['ip_address'] == fixed_ip: + port_id = port['id'] + return port_id + + +class TestSpoofGuardFeature(TestSpoofGuardBasicOps): + @test.attr(type='nsxv') + @test.idempotent_id('2804f55d-3221-440a-9fa8-ab16a8932634') + def test_exclude_list_with_new_attach_port(self): + port_client = self.manager.ports_client + self.green = self.setup_vm_enviornment(self.manager, 'green', True) + vm_id = self.green['serv1']['id'] + net_id = self.green['network']['id'] + name = 'disabled-port-security-port' + kwargs = {'name': name, 'network_id': net_id, + 'port_security_enabled': 'false'} + # Create Port + port = HELO.create_port(self, client=port_client, **kwargs) + port_id = port['id'] + kwargs = {'port_id': port_id} + # Attach interface to vm + self.interface_client.create_interface(vm_id, **kwargs) + # Fetch exclude list information from beckend + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + for exclude_vm in exclude_list: + if vm_id in exclude_vm: + LOG.info(_LI("Vm in exclude list")) + # Update Port security to disabled + port_client.update_port( + port_id=port_id, + port_security_enabled='true') + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + if exclude_vm in exclude_list: + if vm_id not in exclude_vm: + LOG.info(_LI("Vm not in exclude list")) + # Detach interface from vm + self.interface_client.delete_interface(vm_id, port_id) + + @test.attr(type='nsxv') + @test.idempotent_id('38c213df-bfc2-4681-9c9c-3a31c05b0e6f') + def test_exclude_with_multiple_vm(self): + image = self.get_server_image() + flavor = self.get_server_flavor() + port_client = self.manager.ports_client + self.green = self.setup_vm_enviornment(self.manager, 'green', True) + vm_id = self.green['serv1']['id'] + security_groups = [{'name': self.green['security_group']['id']}] + # Boot instance vm2 + t_serv2 = self.create_server_on_network( + self.green['network'], security_groups, + image=image, + flavor=flavor, + name=self.green['network']['name']) + # Boot instance vm3 + t_serv3 = self.create_server_on_network( + self.green['network'], security_groups, + image=self.get_server_image(), + flavor=self.get_server_flavor(), + name=self.green['network']['name']) + self.check_server_connected(t_serv2) + port1_id = self.green['fip1']['port_id'] + port2_id = self.get_port_id(port_client=port_client, vm_info=t_serv2) + port3_id = self.get_port_id(port_client=port_client, vm_info=t_serv3) + # Update vm1 port to disbale port security + port_client.update_port( + port_id=port1_id, + port_security_enabled='false') + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + for exclude_vm in exclude_list: + if vm_id in exclude_vm: + LOG.info(_LI("Vm1 in exclude list")) + vm2_id = t_serv2['id'] + # Update vm2 port to disable port security + port_client.update_port( + port_id=port2_id, + port_security_enabled='false') + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + # Check vm2 in exclude list or not + for exclude_vm in exclude_list: + if vm2_id in exclude_vm: + LOG.info(_LI("Vm2 in exclude list")) + vm3_id = t_serv3['id'] + # Update vm3 port to enable port security + port_client.update_port( + port_id=port3_id, + port_security_enabled='false') + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + # Check vm3 in exclude list or not + for exclude_vm in exclude_list: + if vm3_id in exclude_vm: + LOG.info(_LI("Vm3 in exclude list")) + # Update vm1 port to enable port security + port_client.update_port( + port_id=port1_id, + port_security_enabled='true') + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + # Check vm should not be in exclude list + for exclude_vm in exclude_list: + if vm_id not in exclude_vm: + LOG.info(_LI("Vm1 not in exclude list")) + + @test.attr(type='nsxv') + @test.idempotent_id('f034d3e9-d717-4bcd-8e6e-18e9ada7b81a') + def test_exclude_list_with_single_vm_port(self): + port_client = self.manager.ports_client + self.green = self.setup_vm_enviornment(self.manager, 'green', True) + port_id = self.green['fip1']['port_id'] + # Update vm port to disable port security + port_client.update_port( + port_id=port_id, + port_security_enabled='false') + vm_id = self.green['serv1']['id'] + # Check vm in exclude list or not + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + for exclude_vm in exclude_list: + if vm_id in exclude_vm: + LOG.info(_LI("Vm in exclude list")) + port_client.update_port( + port_id=port_id, + port_security_enabled='true') + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + if exclude_vm in exclude_list: + if vm_id not in exclude_vm: + LOG.info(_LI("Vm not in exclude list")) + self.interface_client.delete_interface(vm_id, port_id) + + @test.attr(type='nsxv') + @test.idempotent_id('3ad04e37-2a9f-4465-86e7-94993eecdfa1') + def test_disabled_network_port_security(self): + network_client = self.manager.networks_client + port_client = self.manager.ports_client + net_id, network, subnet =\ + self.create_project_network_subnet('admin') + kwargs = {'port_security_enabled': 'false'} + # Update network to disbale port security + network_client.update_network(network_id=net_id, **kwargs) + name = 'disabled-port-security-port' + kwargs = {'name': name, 'network_id': net_id} + # Create port under network + port = HELO.create_port(self, client=port_client, **kwargs) + port_id = port['id'] + # Check port security of created port + port_details = port_client.show_port(port_id=port_id) + if (port_details['port']['port_security_enabled'] == 'false'): + LOG.info(_LI("Port security of port is disabled")) + kwargs = {'port_security_enabled': 'true'} + # Update port security of network to enabled + network_client.update_network(network_id=net_id, **kwargs) + name = 'disabled-port-security-port' + kwargs = {'name': name, 'network_id': net_id} + port = HELO.create_port(self, client=port_client, **kwargs) + port_id = port['id'] + port_details = port_client.show_port(port_id=port_id) + if (port_details['port']['port_security_enabled'] == 'true'): + LOG.info(_LI("Port security of port is enabled")) + + @test.attr(type='nsxv') + @test.idempotent_id('c8683cb7-4be5-4670-95c6-344a0aea3667') + def test_exclude_list_with_multiple_ports(self): + port_client = self.manager.ports_client + self.green = self.setup_vm_enviornment(self.manager, 'green', True) + vm_id = self.green['serv1']['id'] + net_id = self.green['network']['id'] + name = 'disabled-port-security-port1' + kwargs = {'name': name, 'network_id': net_id, + 'port_security_enabled': 'false'} + port1 = HELO.create_port(self, client=port_client, **kwargs) + port2 = HELO.create_port(self, client=port_client, **kwargs) + port1_id = port1['id'] + kwargs = {'port_id': port1_id} + self.interface_client.create_interface(vm_id, **kwargs) + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + for exclude_vm in exclude_list: + if vm_id in exclude_vm: + LOG.info(_LI("Vm in exclude list")) + name = 'disabled-port-security-port2' + kwargs = {'name': name, 'network_id': net_id, + 'port_security_enabled': 'false'} + port2_id = port2['id'] + kwargs = {'port_id': port2_id} + self.interface_client.create_interface(vm_id, **kwargs) + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + for exclude_vm in exclude_list: + if vm_id in exclude_vm: + LOG.info(_LI("Vm in exclude list")) + port_client.update_port( + port_id=port2_id, + port_security_enabled='true') + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + for exclude_vm in exclude_list: + if vm_id in exclude_vm: + LOG.info(_LI("Vm in exclude list")) + port_client.update_port( + port_id=port1_id, + port_security_enabled='true') + items = self.vsm.get_excluded_vm_name_list() + exclude_list = [item.encode('utf-8') for item in items] + if exclude_vm in exclude_list: + if vm_id not in exclude_vm: + LOG.info(_LI("Vm not in exclude list")) + self.interface_client.delete_interface(vm_id, port1_id) + self.interface_client.delete_interface(vm_id, port2_id)