Merge "Tempest: Providing support for disable spoofguard feature"

This commit is contained in:
Jenkins 2016-08-03 19:26:47 +00:00 committed by Gerrit Code Review
commit 5e284c601c
3 changed files with 380 additions and 0 deletions

View File

@ -260,6 +260,25 @@ class VSMClient(object):
response = self.get()
return response
def get_excluded_vm_name_list(self):
"""Get excluded vm's list info from beckend.
After disabling port security of vm port, vm will get added
in exclude list.This method returns the list of vm's present
in exclude list.
Returns exclude list of vm's name.
"""
self.__set_api_version('2.1')
self.__set_endpoint('/app/excludelist')
response = self.get()
response_list = []
exclude_list = []
response_list = response.json()[
'excludeListConfigurationDto']['excludeMembers']
exclude_list = [member['member']['name'] for member in response_list
if member['member']['name']]
return exclude_list
def get_dhcp_edge_info(self):
"""Get dhcp edge info.

View File

@ -178,6 +178,19 @@ def create_network(SELF, client=None, tenant_id=None, name=None, **kwargs):
return net_network
def create_port(SELF, client=None, **kwargs):
if not client:
client = SELF.port_client
result = client.create_port(**kwargs)
net_port = result['port']
SELF.assertIsNotNone(result, 'Unable to allocate port')
SELF.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_port,
net_port['id'])
return net_port
# gateway=None means don't set gateway_ip in subnet
def create_subnet(SELF, network, client=None,
gateway='', cidr=None, mask_bits=None,

View File

@ -0,0 +1,348 @@
# Copyright 2016 OpenStack Foundation
# Copyright 2016 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest import test
from vmware_nsx_tempest._i18n import _LI
from vmware_nsx_tempest.services import nsxv_client
from vmware_nsx_tempest.tests.nsxv.scenario import (
manager_topo_deployment as dmgr)
from vmware_nsx_tempest.tests.nsxv.scenario import (
network_addon_methods as HELO)
CONF = config.CONF
LOG = dmgr.manager.log.getLogger(__name__)
class TestSpoofGuardBasicOps(dmgr.TopoDeployScenarioManager):
"""Base class provides Spoof Guard basic operations.
1) Create network, subnet and port
2) Boot an instance using network.
2) Ssh to instance and then check below information:
a) check exclude list whether vm exists in exclude list or not
b) update port-security to disable and check vm exists in exclude list
or not
c) Launch multiple instances anc checks their existence in exclude list
with port-security disabled/enabled.
d) Launch instances and check ping packets between various vm's with
port-security disabled/enabled.
e) Enabled/disablling of network and check behavior w.r.t. port in that
network.
3) Check at beckend(nsx-v) for exclude list.
"""
@classmethod
def skip_checks(cls):
super(TestSpoofGuardBasicOps, cls).skip_checks()
manager_ip = re.search(r"(\d{1,3}\.){3}\d{1,3}",
CONF.nsxv.manager_uri).group(0)
cls.vsm = nsxv_client.VSMClient(
manager_ip, CONF.nsxv.user, CONF.nsxv.password)
nsxv_version = cls.vsm.get_vsm_version()
# Raise skip testcase exception if nsx-v version is less than 6.2.3
if (nsxv_version and nsxv_version < '6.2.3'):
msg = ('NSX-v version should be greater than or equal to 6.2.3')
raise cls.skipException(msg)
@classmethod
def resource_setup(cls):
super(TestSpoofGuardBasicOps, cls).resource_setup()
@classmethod
def resource_cleanup(cls):
super(TestSpoofGuardBasicOps, cls).resource_cleanup()
def tearDown(self):
self.remove_project_network()
super(TestSpoofGuardBasicOps, self).tearDown()
def remove_project_network(self):
project_name = 'green'
tenant = getattr(self, project_name, None)
if tenant:
servers_client = tenant['client_mgr'].servers_client
dmgr.delete_all_servers(servers_client)
self.disassociate_floatingip(tenant['fip1'],
and_delete=True)
def create_project_network_subnet(self,
name_prefix='spoofguard-project'):
network_name = data_utils.rand_name(name_prefix)
network, subnet = self.create_network_subnet(
name=network_name)
return (network['id'], network, subnet)
def check_server_connected(self, serv):
# Fetch tenant-network from where vm deployed
serv_net = list(serv['addresses'].keys())[0]
serv_addr = serv['addresses'][serv_net][0]
host_ip = serv_addr['addr']
self.waitfor_host_connected(host_ip)
def setup_vm_enviornment(self, client_mgr, t_id,
check_outside_world=True,
cidr_offset=0):
t_network, t_subnet, t_router = self.setup_project_network(
self.public_network_id, namestart=("deploy-%s-spoofuard" % t_id),
cidr_offset=0)
t_security_group = self._create_security_group(
security_groups_client=self.security_groups_client,
security_group_rules_client=self.security_group_rules_client,
namestart='adm')
username, password = self.get_image_userpass()
security_groups = [{'name': t_security_group['id']}]
t_serv1 = self.create_server_on_network(
t_network, security_groups,
image=self.get_server_image(),
flavor=self.get_server_flavor(),
name=t_network['name'])
self.check_server_connected(t_serv1)
t_floatingip = self.create_floatingip_for_server(
t_serv1, client_mgr=self.admin_manager)
msg = ("Associate t_floatingip[%s] to server[%s]"
% (t_floatingip, t_serv1['name']))
self._check_floatingip_connectivity(
t_floatingip, t_serv1, should_connect=True, msg=msg)
vm_enviornment = dict(security_group=t_security_group,
network=t_network, subnet=t_subnet,
router=t_router, client_mgr=client_mgr,
serv1=t_serv1, fip1=t_floatingip)
return vm_enviornment
def get_port_id(self, port_client, vm_info):
tenant_name = vm_info['name']
fixed_ip = vm_info['addresses'][tenant_name][0]['addr']
list_ports = port_client.list_ports()
list_ports_extract = list_ports['ports']
for port in list_ports_extract:
if port['fixed_ips'][0]['ip_address'] == fixed_ip:
port_id = port['id']
return port_id
class TestSpoofGuardFeature(TestSpoofGuardBasicOps):
@test.attr(type='nsxv')
@test.idempotent_id('2804f55d-3221-440a-9fa8-ab16a8932634')
def test_exclude_list_with_new_attach_port(self):
port_client = self.manager.ports_client
self.green = self.setup_vm_enviornment(self.manager, 'green', True)
vm_id = self.green['serv1']['id']
net_id = self.green['network']['id']
name = 'disabled-port-security-port'
kwargs = {'name': name, 'network_id': net_id,
'port_security_enabled': 'false'}
# Create Port
port = HELO.create_port(self, client=port_client, **kwargs)
port_id = port['id']
kwargs = {'port_id': port_id}
# Attach interface to vm
self.interface_client.create_interface(vm_id, **kwargs)
# Fetch exclude list information from beckend
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
for exclude_vm in exclude_list:
if vm_id in exclude_vm:
LOG.info(_LI("Vm in exclude list"))
# Update Port security to disabled
port_client.update_port(
port_id=port_id,
port_security_enabled='true')
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
if exclude_vm in exclude_list:
if vm_id not in exclude_vm:
LOG.info(_LI("Vm not in exclude list"))
# Detach interface from vm
self.interface_client.delete_interface(vm_id, port_id)
@test.attr(type='nsxv')
@test.idempotent_id('38c213df-bfc2-4681-9c9c-3a31c05b0e6f')
def test_exclude_with_multiple_vm(self):
image = self.get_server_image()
flavor = self.get_server_flavor()
port_client = self.manager.ports_client
self.green = self.setup_vm_enviornment(self.manager, 'green', True)
vm_id = self.green['serv1']['id']
security_groups = [{'name': self.green['security_group']['id']}]
# Boot instance vm2
t_serv2 = self.create_server_on_network(
self.green['network'], security_groups,
image=image,
flavor=flavor,
name=self.green['network']['name'])
# Boot instance vm3
t_serv3 = self.create_server_on_network(
self.green['network'], security_groups,
image=self.get_server_image(),
flavor=self.get_server_flavor(),
name=self.green['network']['name'])
self.check_server_connected(t_serv2)
port1_id = self.green['fip1']['port_id']
port2_id = self.get_port_id(port_client=port_client, vm_info=t_serv2)
port3_id = self.get_port_id(port_client=port_client, vm_info=t_serv3)
# Update vm1 port to disbale port security
port_client.update_port(
port_id=port1_id,
port_security_enabled='false')
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
for exclude_vm in exclude_list:
if vm_id in exclude_vm:
LOG.info(_LI("Vm1 in exclude list"))
vm2_id = t_serv2['id']
# Update vm2 port to disable port security
port_client.update_port(
port_id=port2_id,
port_security_enabled='false')
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
# Check vm2 in exclude list or not
for exclude_vm in exclude_list:
if vm2_id in exclude_vm:
LOG.info(_LI("Vm2 in exclude list"))
vm3_id = t_serv3['id']
# Update vm3 port to enable port security
port_client.update_port(
port_id=port3_id,
port_security_enabled='false')
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
# Check vm3 in exclude list or not
for exclude_vm in exclude_list:
if vm3_id in exclude_vm:
LOG.info(_LI("Vm3 in exclude list"))
# Update vm1 port to enable port security
port_client.update_port(
port_id=port1_id,
port_security_enabled='true')
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
# Check vm should not be in exclude list
for exclude_vm in exclude_list:
if vm_id not in exclude_vm:
LOG.info(_LI("Vm1 not in exclude list"))
@test.attr(type='nsxv')
@test.idempotent_id('f034d3e9-d717-4bcd-8e6e-18e9ada7b81a')
def test_exclude_list_with_single_vm_port(self):
port_client = self.manager.ports_client
self.green = self.setup_vm_enviornment(self.manager, 'green', True)
port_id = self.green['fip1']['port_id']
# Update vm port to disable port security
port_client.update_port(
port_id=port_id,
port_security_enabled='false')
vm_id = self.green['serv1']['id']
# Check vm in exclude list or not
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
for exclude_vm in exclude_list:
if vm_id in exclude_vm:
LOG.info(_LI("Vm in exclude list"))
port_client.update_port(
port_id=port_id,
port_security_enabled='true')
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
if exclude_vm in exclude_list:
if vm_id not in exclude_vm:
LOG.info(_LI("Vm not in exclude list"))
self.interface_client.delete_interface(vm_id, port_id)
@test.attr(type='nsxv')
@test.idempotent_id('3ad04e37-2a9f-4465-86e7-94993eecdfa1')
def test_disabled_network_port_security(self):
network_client = self.manager.networks_client
port_client = self.manager.ports_client
net_id, network, subnet =\
self.create_project_network_subnet('admin')
kwargs = {'port_security_enabled': 'false'}
# Update network to disbale port security
network_client.update_network(network_id=net_id, **kwargs)
name = 'disabled-port-security-port'
kwargs = {'name': name, 'network_id': net_id}
# Create port under network
port = HELO.create_port(self, client=port_client, **kwargs)
port_id = port['id']
# Check port security of created port
port_details = port_client.show_port(port_id=port_id)
if (port_details['port']['port_security_enabled'] == 'false'):
LOG.info(_LI("Port security of port is disabled"))
kwargs = {'port_security_enabled': 'true'}
# Update port security of network to enabled
network_client.update_network(network_id=net_id, **kwargs)
name = 'disabled-port-security-port'
kwargs = {'name': name, 'network_id': net_id}
port = HELO.create_port(self, client=port_client, **kwargs)
port_id = port['id']
port_details = port_client.show_port(port_id=port_id)
if (port_details['port']['port_security_enabled'] == 'true'):
LOG.info(_LI("Port security of port is enabled"))
@test.attr(type='nsxv')
@test.idempotent_id('c8683cb7-4be5-4670-95c6-344a0aea3667')
def test_exclude_list_with_multiple_ports(self):
port_client = self.manager.ports_client
self.green = self.setup_vm_enviornment(self.manager, 'green', True)
vm_id = self.green['serv1']['id']
net_id = self.green['network']['id']
name = 'disabled-port-security-port1'
kwargs = {'name': name, 'network_id': net_id,
'port_security_enabled': 'false'}
port1 = HELO.create_port(self, client=port_client, **kwargs)
port2 = HELO.create_port(self, client=port_client, **kwargs)
port1_id = port1['id']
kwargs = {'port_id': port1_id}
self.interface_client.create_interface(vm_id, **kwargs)
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
for exclude_vm in exclude_list:
if vm_id in exclude_vm:
LOG.info(_LI("Vm in exclude list"))
name = 'disabled-port-security-port2'
kwargs = {'name': name, 'network_id': net_id,
'port_security_enabled': 'false'}
port2_id = port2['id']
kwargs = {'port_id': port2_id}
self.interface_client.create_interface(vm_id, **kwargs)
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
for exclude_vm in exclude_list:
if vm_id in exclude_vm:
LOG.info(_LI("Vm in exclude list"))
port_client.update_port(
port_id=port2_id,
port_security_enabled='true')
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
for exclude_vm in exclude_list:
if vm_id in exclude_vm:
LOG.info(_LI("Vm in exclude list"))
port_client.update_port(
port_id=port1_id,
port_security_enabled='true')
items = self.vsm.get_excluded_vm_name_list()
exclude_list = [item.encode('utf-8') for item in items]
if exclude_vm in exclude_list:
if vm_id not in exclude_vm:
LOG.info(_LI("Vm not in exclude list"))
self.interface_client.delete_interface(vm_id, port1_id)
self.interface_client.delete_interface(vm_id, port2_id)