Bug:2333131

Adding updated policy path for port security and security group

Change-Id: I86c3d1cdfb8febb30359943c699f7de73edbee7b
This commit is contained in:
Deepthi Kandavara Jayarama 2019-04-25 19:41:28 +00:00
parent 02d1107cca
commit 1cde95eb3c
3 changed files with 106 additions and 45 deletions

View File

@ -414,3 +414,23 @@ class NSXPClient(object):
segment_id, port_id) segment_id, port_id)
response = self.get(endpoint) response = self.get(endpoint)
return response return response
def get_neutron_ns_group_id(self):
"""
Retrieve NSGroup Id
"""
nsx_nsgroup = self.get_ns_groups(tenant_id='default')
for group in nsx_nsgroup:
if group['display_name'] == 'neutron_excluded_ports_group':
nsgroup_id = group['id']
return nsgroup_id
def get_ns_group_port_members(self, ns_group_id, tenant_id):
"""
Retrieve NSGroup port members
"""
endpoint = "domains/%s/groups/%s/members/logical-ports" % (
tenant_id, ns_group_id)
response = self.get(endpoint=endpoint)
res_json = response.json()
return res_json

View File

@ -138,7 +138,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
nsx_nsgroup, nsx_dfw_section = \ nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup, self._wait_till_firewall_gets_realize(secgroup,
dfw_error_msg, dfw_error_msg,
secgroup['tenant_id']) 'default')
else: else:
nsx_nsgroup, nsx_dfw_section = \ nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg) self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
@ -157,7 +157,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule( nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy, nsx_dfw_section_policy,
secgroup_rule['id'], secgroup_rule['tenant_id']) secgroup_rule['id'], 'default')
self.assertIsNotNone(nsx_dfw_rule) self.assertIsNotNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule( nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section, nsx_dfw_section,
@ -169,7 +169,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule( nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy, nsx_dfw_section_policy,
secgroup_rule['id'], secgroup_rule['tenant_id']) secgroup_rule['id'], 'default')
self.assertIsNone(nsx_dfw_rule) self.assertIsNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule( nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section, nsx_dfw_section,
@ -188,7 +188,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
nsx_nsgroup, nsx_dfw_section = \ nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize( self._wait_till_firewall_gets_realize(
secgroup, dfw_error_msg, secgroup, dfw_error_msg,
secgroup['tenant_id']) 'default')
else: else:
nsx_nsgroup, nsx_dfw_section = \ nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg) self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
@ -212,7 +212,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self._wait_till_firewall_gets_realize( self._wait_till_firewall_gets_realize(
updated_secgroup, dfw_error_msg, updated_secgroup, dfw_error_msg,
updated_secgroup['tenant_id']) 'default')
else: else:
self._wait_till_firewall_gets_realize(updated_secgroup, self._wait_till_firewall_gets_realize(updated_secgroup,
dfw_error_msg) dfw_error_msg)
@ -229,7 +229,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self._wait_till_firewall_gets_realize(secgroup, self._wait_till_firewall_gets_realize(secgroup,
dfw_error_msg, dfw_error_msg,
secgroup['tenant_id']) 'default')
else: else:
self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg) self._wait_till_firewall_gets_realize(secgroup, dfw_error_msg)
self.assertEqual(secgroup['name'], name) self.assertEqual(secgroup['name'], name)
@ -239,15 +239,15 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_nsgroup_policy = self.nsxp.get_ns_group( nsx_nsgroup_policy = self.nsxp.get_ns_group(
secgroup['name'], secgroup['id'], secgroup['name'], secgroup['id'],
os_tenant_id=secgroup['tenant_id']) os_tenant_id='default')
self.assertIsNone(nsx_nsgroup_policy) self.assertIsNone(nsx_nsgroup_policy)
nsx_dfw_section_policy = self.nsxp.get_firewall_section( nsx_dfw_section_policy = self.nsxp.get_firewall_section(
secgroup['name'], secgroup['id'], secgroup['name'], secgroup['id'],
os_tenant_id=secgroup['tenant_id']) os_tenant_id='default')
self.assertIsNone(nsx_dfw_section_policy, dfw_error_msg) self.assertIsNone(nsx_dfw_section_policy, dfw_error_msg)
nsx_nsgroup = self.nsx.get_ns_group( nsx_nsgroup = self.nsx.get_ns_group(
secgroup['name'], secgroup['id'], nsxp=True, secgroup['name'], secgroup['id'], nsxp=True,
os_tenant_id=secgroup['tenant_id']) os_tenant_id='default')
nsx_dfw_section = self.nsx.get_firewall_section( nsx_dfw_section = self.nsx.get_firewall_section(
secgroup['name'], secgroup['id'], nsxp=True) secgroup['name'], secgroup['id'], nsxp=True)
else: else:
@ -270,7 +270,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
nsx_nsgroup, nsx_dfw_section = \ nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize( self._wait_till_firewall_gets_realize(
secgroup, dfw_error_msg, secgroup, dfw_error_msg,
secgroup['tenant_id']) 'default')
else: else:
nsx_nsgroup, nsx_dfw_section = \ nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup, self._wait_till_firewall_gets_realize(secgroup,
@ -307,7 +307,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule( nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy, nsx_dfw_section_policy,
secgroup_rule['id'], secgroup_rule['tenant_id']) secgroup_rule['id'], 'default')
self.assertIsNotNone(nsx_dfw_rule) self.assertIsNotNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule( nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section, nsx_dfw_section,
@ -342,7 +342,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
nsx_nsgroup, nsx_dfw_section = \ nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup, self._wait_till_firewall_gets_realize(secgroup,
dfw_error_msg, dfw_error_msg,
secgroup['tenant_id']) 'default')
else: else:
nsx_nsgroup, nsx_dfw_section = \ nsx_nsgroup, nsx_dfw_section = \
self._wait_till_firewall_gets_realize(secgroup, self._wait_till_firewall_gets_realize(secgroup,
@ -362,7 +362,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule( nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy, nsx_dfw_section_policy,
secgroup_rule['id'], secgroup_rule['tenant_id']) secgroup_rule['id'], 'default')
self.assertIsNotNone(nsx_dfw_rule) self.assertIsNotNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule( nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section, nsx_dfw_section,
@ -374,7 +374,7 @@ class NSXv3SecGroupTest(base.BaseSecGroupTest):
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_dfw_rule = self.nsxp.get_firewall_section_rule( nsx_dfw_rule = self.nsxp.get_firewall_section_rule(
nsx_dfw_section_policy, nsx_dfw_section_policy,
secgroup_rule['id'], secgroup_rule['tenant_id']) secgroup_rule['id'], 'default')
self.assertIsNone(nsx_dfw_rule) self.assertIsNone(nsx_dfw_rule)
nsx_dfw_rule = self.nsx.get_firewall_section_rule( nsx_dfw_rule = self.nsx.get_firewall_section_rule(
nsx_dfw_section, nsx_dfw_section,

View File

@ -23,6 +23,7 @@ from tempest.lib.common.utils import test_utils
from tempest.lib import decorators from tempest.lib import decorators
from vmware_nsx_tempest_plugin.common import constants from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client from vmware_nsx_tempest_plugin.services import nsxv3_client
from vmware_nsx_tempest_plugin.tests.scenario import manager from vmware_nsx_tempest_plugin.tests.scenario import manager
@ -57,6 +58,9 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user, CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password) CONF.nsxv3.nsx_password)
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def setUp(self): def setUp(self):
super(TestNSXv3PortSecurityScenario, self).setUp() super(TestNSXv3PortSecurityScenario, self).setUp()
@ -263,25 +267,44 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
kwargs = {"port_security_enabled": "false", "security_groups": []} kwargs = {"port_security_enabled": "false", "security_groups": []}
port_client.update_port(port_id['port']['id'], **kwargs) port_client.update_port(port_id['port']['id'], **kwargs)
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
nsgroup_id = self.nsx.get_neutron_ns_group_id() if CONF.network.backend == 'nsxp':
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) nsgroup_id = self.nsxp.get_neutron_ns_group_id()
instance = "instance-port_%s" % port_id['port']['id'][0:4] nsxgroup_data = self.nsxp.get_ns_group_port_members(nsgroup_id,
for nsxgroup in nsxgroup_data['results']: 'default')
if instance in nsxgroup['target_display_name']: instance = "instance-port_%s" % port_id['port']['id'][0:4]
break for nsxgroup in nsxgroup_data['results']:
if instance in nsxgroup['display_name']:
break
else:
nsgroup_id = self.nsx.get_neutron_ns_group_id()
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
instance = "instance-port_%s" % port_id['port']['id'][0:4]
for nsxgroup in nsxgroup_data['results']:
if instance in nsxgroup['target_display_name']:
break
status.append('True') status.append('True')
kwargs = {"port_security_enabled": "true"} kwargs = {"port_security_enabled": "true"}
port_client.update_port(port_id['port']['id'], **kwargs) port_client.update_port(port_id['port']['id'], **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
nsgroup_id = self.nsx.get_neutron_ns_group_id() if CONF.network.backend == 'nsxp':
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) nsgroup_id = self.nsxp.get_neutron_ns_group_id()
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) nsxgroup_data = self.nsxp.get_ns_group_port_members(nsgroup_id,
for nsxgroup in nsxgroup_data['results']: 'default')
if instance in nsxgroup['target_display_name']: for nsxgroup in nsxgroup_data['results']:
status.append('True') if instance in nsxgroup['display_name']:
return status status.append('True')
else: return status
continue else:
continue
else:
nsgroup_id = self.nsx.get_neutron_ns_group_id()
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
for nsxgroup in nsxgroup_data['results']:
if instance in nsxgroup['target_display_name']:
status.append('True')
return status
else:
continue
status.append('False') status.append('False')
return status return status
@ -367,27 +390,45 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
sec_group = sec_grp_port['port']['security_groups'][0] sec_group = sec_grp_port['port']['security_groups'][0]
port_client.update_port(port_id, **kwargs) port_client.update_port(port_id, **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
nsgroup_id = self.nsx.get_neutron_ns_group_id() if CONF.network.backend == 'nsxp':
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) nsgroup_id = self.nsxp.get_neutron_ns_group_id()
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) nsxgroup_data = self.nsxp.get_ns_group_port_members(nsgroup_id,
instance = "instance-port_%s" % port_id[0:4] 'default')
for nsxgroup in nsxgroup_data['results']: instance = "instance-port_%s" % port_id[0:4]
if instance in nsxgroup['target_display_name']: for nsxgroup in nsxgroup_data['results']:
break if instance in nsxgroup['display_name']:
break
else:
nsgroup_id = self.nsx.get_neutron_ns_group_id()
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
instance = "instance-port_%s" % port_id[0:4]
for nsxgroup in nsxgroup_data['results']:
if instance in nsxgroup['target_display_name']:
break
status.append('True') status.append('True')
kwargs = {"port_security_enabled": "true", kwargs = {"port_security_enabled": "true",
"security_groups": [sec_group]} "security_groups": [sec_group]}
port_client.update_port(port_id, **kwargs) port_client.update_port(port_id, **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
nsgroup_id = self.nsx.get_neutron_ns_group_id() if CONF.network.backend == 'nsxp':
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) nsgroup_id = self.nsxp.get_neutron_ns_group_id()
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) nsxgroup_data = self.nsxp.get_ns_group_port_members(nsgroup_id,
for nsxgroup in nsxgroup_data['results']: 'default')
if instance in nsxgroup['target_display_name']: for nsxgroup in nsxgroup_data['results']:
status.append('True') if instance in nsxgroup['display_name']:
return status status.append('True')
else: return status
continue else:
continue
else:
nsgroup_id = self.nsx.get_neutron_ns_group_id()
nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id)
for nsxgroup in nsxgroup_data['results']:
if instance in nsxgroup['target_display_name']:
status.append('True')
return status
else:
continue
status.append('False') status.append('False')
return status return status