From de71fe73346d8a9089d5c6e829ee6b16378e97b5 Mon Sep 17 00:00:00 2001 From: Deepthi Kandavara Jayarama Date: Tue, 16 Apr 2019 22:48:31 +0000 Subject: [PATCH] [Policy] Converting port security tests to policy Change-Id: Ib7b35c1bb898203ed2ff5937c8652338dfdc9b47 --- .../services/nsxv3_client.py | 16 +++-- .../tests/nsxv3/api/test_nsx_port_security.py | 72 +++++++++++++++---- 2 files changed, 68 insertions(+), 20 deletions(-) diff --git a/vmware_nsx_tempest_plugin/services/nsxv3_client.py b/vmware_nsx_tempest_plugin/services/nsxv3_client.py index 04eaaec..6180251 100644 --- a/vmware_nsx_tempest_plugin/services/nsxv3_client.py +++ b/vmware_nsx_tempest_plugin/services/nsxv3_client.py @@ -402,15 +402,21 @@ class NSXV3Client(object): """ return self.get_logical_resources("/ns-groups") - def get_neutron_ns_group_id(self): + def get_neutron_ns_group_id(self, nsxp=False): """ Retrieve NSGroup Id """ nsx_nsgroup = self.get_ns_groups() - for group in nsx_nsgroup: - if group['display_name'] == 'neutron_excluded_port_nsgroup': - nsgroup_id = group['id'] - return nsgroup_id + if nsxp: + for group in nsx_nsgroup: + if group['display_name'] == ( + 'default.neutron_excluded_ports_group'): + nsgroup_id = group['id'] + else: + for group in nsx_nsgroup: + if group['display_name'] == 'neutron_excluded_port_nsgroup': + nsgroup_id = group['id'] + return nsgroup_id def get_ns_group_port_members(self, ns_group_id): """ diff --git a/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_port_security.py b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_port_security.py index 528a487..8530534 100644 --- a/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_port_security.py +++ b/vmware_nsx_tempest_plugin/tests/nsxv3/api/test_nsx_port_security.py @@ -23,6 +23,7 @@ from tempest.lib import exceptions from tempest import test from vmware_nsx_tempest_plugin.common import constants +from vmware_nsx_tempest_plugin.services import nsxp_client from vmware_nsx_tempest_plugin.services import nsxv3_client CONF = config.CONF @@ -57,6 +58,9 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest): cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager, CONF.nsxv3.nsx_user, CONF.nsxv3.nsx_password) + cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager, + CONF.nsxv3.nsx_user, + CONF.nsxv3.nsx_password) cls.network = cls.create_network() def get_tag_port_id(self, nsxgroup_data, org_port_id): @@ -136,6 +140,17 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest): secgroup_id = secgroup['id'] return secgroup_id + def _create_security_group(self, client): + """ + Method to create security group and return id + """ + security_client = client.security_groups_client + create_body = security_client.create_security_group(name='sec-group') + secgroup = create_body['security_group'] + # Sleep for 5 sec + time.sleep(constants.NSX_BACKEND_VERY_SMALL_TIME_INTERVAL) + return secgroup + @decorators.attr(type='nsxv3') @decorators.idempotent_id('50203701-1cda-4f31-806d-7a51514b9664') def test_create_port_with_security_enabled_check_in_neutron_database(self): @@ -181,7 +196,11 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest): port_detail = port_client.show_port(port_id['port']['id']) self.assertEqual(False, port_detail['port']["port_security_enabled"]) org_port_id = port_id['port']['id'] - nsgroup_id = self.nsx.get_neutron_ns_group_id() + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsgroup_id = self.nsx.get_neutron_ns_group_id(nsxp=True) + else: + nsgroup_id = self.nsx.get_neutron_ns_group_id() nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) # Sleep for 10 sec time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) @@ -214,16 +233,24 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest): @decorators.attr(type='nsxv3') @decorators.idempotent_id('ee6213ac-dfcd-401b-bbc6-03afd26f203a') def test_tenant_port_security_at_beckend_after_enable_disable(self): - secgroup_id = self._create_security_group_and_return_id(self.cmgr_alt) + secgroup = self._create_security_group(self.cmgr_alt) network_topo = self._create_network_topo(self.cmgr_alt) port_client = self.cmgr_alt.ports_client kwargs = {"port_security_enabled": "false", "security_groups": []} org_port_id = network_topo['port']['port']['id'] port_client.update_port(org_port_id, **kwargs) - # Sleep for 10 sec - time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) - nsgroup_id = self.nsx.get_neutron_ns_group_id() + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_nsgroup_policy = self.nsxp.get_ns_group( + secgroup['name'], secgroup['id'], + os_tenant_id=secgroup['tenant_id']) + self.assertIsNotNone(nsx_nsgroup_policy) + nsgroup_id = self.nsx.get_neutron_ns_group_id(nsxp=True) + else: + # Sleep for 10 sec + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + nsgroup_id = self.nsx.get_neutron_ns_group_id() nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) corresponding_port_id = self.get_tag_port_id(nsxgroup_data, org_port_id) @@ -233,11 +260,14 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest): corresponding_port_id) self.assertEqual(True, status) kwargs = {"port_security_enabled": "true", - "security_groups": [secgroup_id]} + "security_groups": [secgroup['id']]} port_client.update_port(org_port_id, **kwargs) - time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) - nsgroup_id = self.nsx.get_neutron_ns_group_id() + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + else: + time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) + nsgroup_id = self.nsx.get_neutron_ns_group_id() nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) status = self.check_port_not_exists_in_os_group(nsxgroup_data, corresponding_port_id) @@ -246,7 +276,7 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest): @decorators.attr(type='nsxv3') @decorators.idempotent_id('c6f4c2f2-3fc9-4983-a05a-bb3a3dc35ad8') def test_admin_port_security_at_beckend_after_enable_disable(self): - secgroup_id = self._create_security_group_and_return_id(self.cmgr_adm) + secgroup = self._create_security_group(self.cmgr_adm) network_topo = self._create_network_topo(self.cmgr_adm) port_client = self.cmgr_adm.ports_client kwargs = {"port_security_enabled": "false", @@ -254,9 +284,17 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest): org_port_id = network_topo['port']['port']['id'] port_client.update_port(org_port_id, **kwargs) - # Sleep for 10 sec - time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) - nsgroup_id = self.nsx.get_neutron_ns_group_id() + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_nsgroup_policy = self.nsxp.get_ns_group( + secgroup['name'], secgroup['id'], + os_tenant_id=secgroup['tenant_id']) + self.assertIsNotNone(nsx_nsgroup_policy) + nsgroup_id = self.nsx.get_neutron_ns_group_id(nsxp=True) + else: + # Sleep for 10 sec + time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL) + nsgroup_id = self.nsx.get_neutron_ns_group_id() nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) corresponding_port_id = self.get_tag_port_id(nsxgroup_data, org_port_id) @@ -266,10 +304,14 @@ class NSXv3PortSecurity(base.BaseAdminNetworkTest): corresponding_port_id) self.assertEqual(True, status) kwargs = {"port_security_enabled": "true", - "security_groups": [secgroup_id]} + "security_groups": [secgroup['id']]} port_client.update_port(org_port_id, **kwargs) - time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) - nsgroup_id = self.nsx.get_neutron_ns_group_id() + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) + else: + time.sleep(constants.NSX_BACKEND_TIME_INTERVAL) + nsgroup_id = self.nsx.get_neutron_ns_group_id() nsxgroup_data = self.nsx.get_ns_group_port_members(nsgroup_id) status = self.check_port_not_exists_in_os_group(nsxgroup_data, corresponding_port_id)