Merge "[Tempest]: Added api cases for port security feature."
This commit is contained in:
commit
8f23451a42
@ -36,3 +36,8 @@ MD_ERROR_CODE_WHEN_LS_BOUNDED = "10026"
|
|||||||
INTERVAL_BETWEEN_EXEC_RETRY_ON_SSH = 5
|
INTERVAL_BETWEEN_EXEC_RETRY_ON_SSH = 5
|
||||||
MAX_NO_OF_TIMES_EXECUTION_OVER_SSH = 30
|
MAX_NO_OF_TIMES_EXECUTION_OVER_SSH = 30
|
||||||
MD_BASE_URL = "http://169.254.169.254/"
|
MD_BASE_URL = "http://169.254.169.254/"
|
||||||
|
|
||||||
|
# NSXV3 Port Security constants.
|
||||||
|
NSX_BACKEND_TIME_INTERVAL = 30
|
||||||
|
NSX_BACKEND_SMALL_TIME_INTERVAL = 10
|
||||||
|
NSX_BACKEND_VERY_SMALL_TIME_INTERVAL = 5
|
||||||
|
@ -349,6 +349,25 @@ class NSXV3Client(object):
|
|||||||
"""
|
"""
|
||||||
return self.get_logical_resources("/ns-groups")
|
return self.get_logical_resources("/ns-groups")
|
||||||
|
|
||||||
|
def get_neutron_ns_group_id(self):
|
||||||
|
"""
|
||||||
|
Retrieve NSGroup Id
|
||||||
|
"""
|
||||||
|
nsx_nsgroup = self.get_ns_groups()
|
||||||
|
for group in nsx_nsgroup:
|
||||||
|
if group['display_name'] == 'neutron_excluded_port_nsgroup':
|
||||||
|
nsgroup_id = group['id']
|
||||||
|
return nsgroup_id
|
||||||
|
|
||||||
|
def get_ns_group_port_members(self, ns_group_id):
|
||||||
|
"""
|
||||||
|
Retrieve NSGroup port members
|
||||||
|
"""
|
||||||
|
endpoint = "/ns-groups/%s/effective-logical-port-members" % ns_group_id
|
||||||
|
response = self.get(endpoint=endpoint)
|
||||||
|
res_json = response.json()
|
||||||
|
return res_json
|
||||||
|
|
||||||
def get_ns_group(self, os_name, os_uuid):
|
def get_ns_group(self, os_name, os_uuid):
|
||||||
"""
|
"""
|
||||||
Get the NSGroup based on the name provided.
|
Get the NSGroup based on the name provided.
|
||||||
|
276
vmware_nsx_tempest/tests/nsxv3/api/test_nsx_port_security.py
Normal file
276
vmware_nsx_tempest/tests/nsxv3/api/test_nsx_port_security.py
Normal file
@ -0,0 +1,276 @@
|
|||||||
|
# Copyright 2017 VMware Inc
|
||||||
|
# All Rights Reserved
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
from tempest.api.network import base
|
||||||
|
from tempest import config
|
||||||
|
from tempest.lib.common.utils import test_utils
|
||||||
|
from tempest.lib import decorators
|
||||||
|
from tempest.lib import exceptions
|
||||||
|
|
||||||
|
from tempest import test
|
||||||
|
from vmware_nsx_tempest.common import constants
|
||||||
|
from vmware_nsx_tempest.services import nsxv3_client
|
||||||
|
|
||||||
|
CONF = config.CONF
|
||||||
|
|
||||||
|
|
||||||
|
class NSXv3PortSecurity(base.BaseAdminNetworkTest):
|
||||||
|
"""Test Port security of Port
|
||||||
|
|
||||||
|
1. Create/Update port with port security enable and disable.
|
||||||
|
2. Create/Update port security and check at beckend NSGroup.
|
||||||
|
3. Check non admin tenant shouldn't update port security of admin port.
|
||||||
|
4. Check non-admin tenant can't delete port security of admin port.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def skip_checks(cls):
|
||||||
|
super(NSXv3PortSecurity, cls).skip_checks()
|
||||||
|
if not test.is_extension_enabled('port-security-enabled', 'network'):
|
||||||
|
msg = "Extension port-security-enabled is not enabled."
|
||||||
|
raise cls.skipException(msg)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setup_clients(cls):
|
||||||
|
super(NSXv3PortSecurity, cls).setup_clients()
|
||||||
|
cls.cmgr_pri = cls.get_client_manager('primary')
|
||||||
|
cls.cmgr_alt = cls.get_client_manager('alt')
|
||||||
|
cls.cmgr_adm = cls.get_client_manager('admin')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def resource_setup(cls):
|
||||||
|
super(NSXv3PortSecurity, cls).resource_setup()
|
||||||
|
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
|
||||||
|
CONF.nsxv3.nsx_user,
|
||||||
|
CONF.nsxv3.nsx_password)
|
||||||
|
cls.network = cls.create_network()
|
||||||
|
|
||||||
|
def get_tag_port_id(self, nsxgroup_data, org_port_id):
|
||||||
|
"""
|
||||||
|
Method to get port of neutron corresponding to backend port-id
|
||||||
|
"""
|
||||||
|
for ports in self.nsx.get_logical_ports():
|
||||||
|
for port_id in nsxgroup_data['results']:
|
||||||
|
if ports['display_name'] == port_id['target_display_name']:
|
||||||
|
for tag in ports['tags']:
|
||||||
|
if tag['scope'] == "os-neutron-port-id" and \
|
||||||
|
tag['tag'] == org_port_id:
|
||||||
|
corresponding_port_id = ports['display_name']
|
||||||
|
return corresponding_port_id
|
||||||
|
|
||||||
|
def check_port_not_exists_in_os_group(self, nsxgroup_data,
|
||||||
|
corresponding_port_id):
|
||||||
|
"""
|
||||||
|
Method to check neutron port not exists in neutron OSGroup
|
||||||
|
"""
|
||||||
|
if nsxgroup_data['results'] != []:
|
||||||
|
for port_id in nsxgroup_data['results']:
|
||||||
|
if corresponding_port_id != port_id['target_display_name']:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def check_port_exists_in_os_group(self, nsxgroup_data,
|
||||||
|
corresponding_port_id):
|
||||||
|
"""
|
||||||
|
Method to check neutron port exists in neutron OSGroup
|
||||||
|
"""
|
||||||
|
for port_id in nsxgroup_data['results']:
|
||||||
|
if corresponding_port_id == port_id['target_display_name']:
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _create_network_topo(self, client):
|
||||||
|
"""
|
||||||
|
Method to create network topology which includes network, subnet
|
||||||
|
and port
|
||||||
|
"""
|
||||||
|
net_client = client.networks_client
|
||||||
|
body = {'name': 'port-security-network'}
|
||||||
|
network = net_client.create_network(**body)
|
||||||
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||||
|
net_client.delete_network,
|
||||||
|
network['network']['id'])
|
||||||
|
body = {"network_id": network['network']['id'],
|
||||||
|
"allocation_pools": [{"start": "2.0.0.2",
|
||||||
|
"end": "2.0.0.254"}],
|
||||||
|
"ip_version": 4, "cidr": "2.0.0.0/24"}
|
||||||
|
subnet_client = client.subnets_client
|
||||||
|
subnet = subnet_client.create_subnet(**body)
|
||||||
|
body = {"network_id": network['network']['id'],
|
||||||
|
"admin_state_up": "true",
|
||||||
|
"port_security_enabled": "false", "security_groups": []}
|
||||||
|
port_client = client.ports_client
|
||||||
|
port = port_client.create_port(**body)
|
||||||
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||||
|
port_client.delete_port,
|
||||||
|
port['port']['id'])
|
||||||
|
network_topo = dict(network=network, subnet=subnet, port=port)
|
||||||
|
return network_topo
|
||||||
|
|
||||||
|
def _create_security_group_and_return_id(self, client):
|
||||||
|
"""
|
||||||
|
Method to create security group and return id
|
||||||
|
"""
|
||||||
|
security_client = client.security_groups_client
|
||||||
|
create_body = security_client.create_security_group(name='sec-group')
|
||||||
|
secgroup = create_body['security_group']
|
||||||
|
# Sleep for 5 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_VERY_SMALL_TIME_INTERVAL)
|
||||||
|
secgroup_id = secgroup['id']
|
||||||
|
return secgroup_id
|
||||||
|
|
||||||
|
@test.attr(type='nsxv3')
|
||||||
|
@decorators.idempotent_id('50203701-1cda-4f31-806d-7a51514b9664')
|
||||||
|
def test_create_port_with_security_enabled_check_in_neutron_database(self):
|
||||||
|
secgroup_id = self._create_security_group_and_return_id(self.cmgr_adm)
|
||||||
|
network_topo = self._create_network_topo(self.cmgr_adm)
|
||||||
|
port_client = self.cmgr_adm.ports_client
|
||||||
|
port_id = network_topo['port']['port']['id']
|
||||||
|
port_detail = port_client.show_port(port_id)
|
||||||
|
self.assertEqual(False, port_detail['port']["port_security_enabled"])
|
||||||
|
body = {"port_security_enabled": "true",
|
||||||
|
"security_groups": [secgroup_id]}
|
||||||
|
port_client.update_port(port_id, **body)
|
||||||
|
# Sleep for 10 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
port_detail = port_client.show_port(port_id)
|
||||||
|
self.assertEqual(True, port_detail['port']["port_security_enabled"])
|
||||||
|
|
||||||
|
@test.attr(type='nsxv3')
|
||||||
|
@decorators.idempotent_id('4b765fa2-345f-4d2c-928f-ad4b347936fd')
|
||||||
|
def test_create_port_with_security_enabled_check_at_beckend(self):
|
||||||
|
net_client = self.cmgr_adm.networks_client
|
||||||
|
body = {'name': 'port-security-network'}
|
||||||
|
network = net_client.create_network(**body)
|
||||||
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||||
|
net_client.delete_network,
|
||||||
|
network['network']['id'])
|
||||||
|
body = {"network_id": network['network']['id'],
|
||||||
|
"allocation_pools": [{"start": "2.0.0.2",
|
||||||
|
"end": "2.0.0.254"}],
|
||||||
|
"ip_version": 4, "cidr": "2.0.0.0/24"}
|
||||||
|
subnet_client = self.cmgr_adm.subnets_client
|
||||||
|
subnet_client.create_subnet(**body)
|
||||||
|
body = {"network_id": network['network']['id'],
|
||||||
|
"admin_state_up": "true",
|
||||||
|
"port_security_enabled": "false", "security_groups": []}
|
||||||
|
port_client = self.cmgr_adm.ports_client
|
||||||
|
port_id = port_client.create_port(**body)
|
||||||
|
# Sleep for 10 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||||
|
port_client.delete_port,
|
||||||
|
port_id['port']['id'])
|
||||||
|
port_detail = port_client.show_port(port_id['port']['id'])
|
||||||
|
self.assertEqual(False, port_detail['port']["port_security_enabled"])
|
||||||
|
org_port_id = port_id['port']['id']
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
||||||
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
|
# Sleep for 10 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
corresponding_port_id = self.get_tag_port_id(nsxgroup_data,
|
||||||
|
org_port_id)
|
||||||
|
status = self.check_port_exists_in_os_group(nsxgroup_data,
|
||||||
|
corresponding_port_id)
|
||||||
|
self.assertEqual(True, status)
|
||||||
|
|
||||||
|
@test.attr(type='nsxv3')
|
||||||
|
@decorators.idempotent_id('bcafeb10-fbf1-4c97-9e4f-50e56d32bdcf')
|
||||||
|
def test_non_admin_cannot_update_admin_port_with_security(self):
|
||||||
|
network_topo = self._create_network_topo(self.cmgr_adm)
|
||||||
|
tenant_port_client = self.cmgr_alt.ports_client
|
||||||
|
kwargs = {"port_security_enabled": "true"}
|
||||||
|
self.assertRaises(exceptions.NotFound,
|
||||||
|
tenant_port_client.update_port,
|
||||||
|
network_topo['port']['port']['id'],
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
|
@test.attr(type='nsxv3')
|
||||||
|
@decorators.idempotent_id('cf4b8d28-13c4-4339-993f-66070708e914')
|
||||||
|
def test_non_admin_cannot_delete_tenant_port_with_port_security(self):
|
||||||
|
network_topo = self._create_network_topo(self.cmgr_pri)
|
||||||
|
tenant_port_client = self.cmgr_alt.ports_client
|
||||||
|
self.assertRaises(exceptions.NotFound,
|
||||||
|
tenant_port_client.delete_port,
|
||||||
|
network_topo['port']['port']['id'])
|
||||||
|
|
||||||
|
@test.attr(type='nsxv3')
|
||||||
|
@decorators.idempotent_id('ee6213ac-dfcd-401b-bbc6-03afd26f203a')
|
||||||
|
def test_tenant_port_security_at_beckend_after_enable_disable(self):
|
||||||
|
secgroup_id = self._create_security_group_and_return_id(self.cmgr_alt)
|
||||||
|
network_topo = self._create_network_topo(self.cmgr_alt)
|
||||||
|
port_client = self.cmgr_alt.ports_client
|
||||||
|
kwargs = {"port_security_enabled": "false", "security_groups": []}
|
||||||
|
org_port_id = network_topo['port']['port']['id']
|
||||||
|
port_client.update_port(org_port_id,
|
||||||
|
**kwargs)
|
||||||
|
# Sleep for 10 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
||||||
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
|
corresponding_port_id = self.get_tag_port_id(nsxgroup_data,
|
||||||
|
org_port_id)
|
||||||
|
# Sleep for 10 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
status = self.check_port_exists_in_os_group(nsxgroup_data,
|
||||||
|
corresponding_port_id)
|
||||||
|
self.assertEqual(True, status)
|
||||||
|
kwargs = {"port_security_enabled": "true",
|
||||||
|
"security_groups": [secgroup_id]}
|
||||||
|
port_client.update_port(org_port_id,
|
||||||
|
**kwargs)
|
||||||
|
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
||||||
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
|
status = self.check_port_not_exists_in_os_group(nsxgroup_data,
|
||||||
|
corresponding_port_id)
|
||||||
|
self.assertEqual(False, status)
|
||||||
|
|
||||||
|
@test.attr(type='nsxv3')
|
||||||
|
@decorators.idempotent_id('c6f4c2f2-3fc9-4983-a05a-bb3a3dc35ad8')
|
||||||
|
def test_admin_port_security_at_beckend_after_enable_disable(self):
|
||||||
|
secgroup_id = self._create_security_group_and_return_id(self.cmgr_adm)
|
||||||
|
network_topo = self._create_network_topo(self.cmgr_adm)
|
||||||
|
port_client = self.cmgr_adm.ports_client
|
||||||
|
kwargs = {"port_security_enabled": "false",
|
||||||
|
"security_groups": []}
|
||||||
|
org_port_id = network_topo['port']['port']['id']
|
||||||
|
port_client.update_port(org_port_id,
|
||||||
|
**kwargs)
|
||||||
|
# Sleep for 10 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
||||||
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
|
corresponding_port_id = self.get_tag_port_id(nsxgroup_data,
|
||||||
|
org_port_id)
|
||||||
|
# Sleep for 10 sec
|
||||||
|
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
|
||||||
|
status = self.check_port_exists_in_os_group(nsxgroup_data,
|
||||||
|
corresponding_port_id)
|
||||||
|
self.assertEqual(True, status)
|
||||||
|
kwargs = {"port_security_enabled": "true",
|
||||||
|
"security_groups": [secgroup_id]}
|
||||||
|
port_client.update_port(org_port_id, **kwargs)
|
||||||
|
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
|
||||||
|
nsgroup_id = self.nsx.get_neutron_ns_group_id()
|
||||||
|
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
|
||||||
|
status = self.check_port_not_exists_in_os_group(nsxgroup_data,
|
||||||
|
corresponding_port_id)
|
||||||
|
self.assertEqual(False, status)
|
Loading…
Reference in New Issue
Block a user