From 570833ca32bf682286e33abbacd9933f47ba3803 Mon Sep 17 00:00:00 2001 From: Shubhamk Kadam Date: Mon, 8 Apr 2019 16:07:20 +0000 Subject: [PATCH] NSXP policy support for cert cases Change-Id: I6031b445c3e20f20b4c8d3c13f9a857b68cc2834 --- .../services/nsxp_client.py | 43 +++ .../scenario/test_client_cert_mgmt_ops.py | 305 ++++++++++++------ 2 files changed, 256 insertions(+), 92 deletions(-) diff --git a/vmware_nsx_tempest_plugin/services/nsxp_client.py b/vmware_nsx_tempest_plugin/services/nsxp_client.py index 47670b1..b34dd57 100644 --- a/vmware_nsx_tempest_plugin/services/nsxp_client.py +++ b/vmware_nsx_tempest_plugin/services/nsxp_client.py @@ -356,3 +356,46 @@ class NSXPClient(object): endpoint = "/logical-routers/%s/routing/advertisement" % lrouter['id'] response = self.get(endpoint) return response.json() + + def get_qos_profiles(self): + """ + Get all user defined qos-profiles + """ + endpoint = "qos-profiles" + return self.get_logical_resources(endpoint) + + def get_qos_profile(self, os_name, os_uuid): + """ + Get the qos-profile based on the name and uuid provided. + + The name of the qos-profile should follow + _... + Return qos-profile if found, otherwise return None + """ + if not os_name or not os_uuid: + LOG.error("Name and uuid of Openstack qos-profile need to be " + "present in order to query backend qos-profile!") + return None + nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:] + qos_profile = self.get_qos_profiles() + return self.get_nsx_resource_by_name(qos_profile, nsx_name) + + def get_logical_ports(self, nsx_network): + """ + Retrieve all logical ports of segments on NSX backend + """ + return self.get_logical_resources( + "segments/%s/ports" % nsx_network['id']) + + def get_logical_port(self, os_name, nsx_network): + """ + Get the logical port based on the os_name provided. + The name of the logical port shoud match the os_name. + Return the logical port if found, otherwise return None. + """ + if not os_name: + LOG.error("Name of OS port should be present " + "in order to query backend logical port created") + return None + lports = self.get_logical_ports(nsx_network) + return self.get_nsx_resource_by_name(lports, os_name) diff --git a/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_client_cert_mgmt_ops.py b/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_client_cert_mgmt_ops.py index bffdc9a..051fcb1 100644 --- a/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_client_cert_mgmt_ops.py +++ b/vmware_nsx_tempest_plugin/tests/nsxv3/scenario/test_client_cert_mgmt_ops.py @@ -118,12 +118,16 @@ class TestCertificateMgmt(manager.NetworkScenarioTest): cls.policies_created.append(policy) return policy - def parse_response(self, response): + def parse_response(self, response, ports=False): """ Parse response from NSX backend to check if NSX is unable to delete or modify openstack entities """ msg = 'Error: NSX admin is able to modify/delete' + if ports: + self.error_msg = 'cannot be deleted as either it '\ + 'has children or it is being'\ + ' referenced by other objects' if all(x in response.json()['error_message'] for x in self.error_msg): LOG.info('NSX admin is unable to modify/delete ' 'the openstack object') @@ -141,14 +145,26 @@ class TestCertificateMgmt(manager.NetworkScenarioTest): self.port = self._create_port(network_id=self.network['id'], namestart='ca') msg = 'Logical Port %s not found' % self.port['name'] - self.assertIsNotNone(self.nsx.get_logical_port(self.port['name']), - msg) - data = self.nsx.get_logical_port(self.port['name']) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_network = self.nsxp.get_logical_switch(self.network['name'], + self.network['id']) + self.assertIsNotNone(self.nsxp.get_logical_port(self.port['name'], + nsx_network), + msg) + nsx_port = self.nsxp.get_logical_port(self.port['name'], + nsx_network) + data = {'nsx_network': nsx_network, 'nsx_port': nsx_port} + else: + self.assertIsNotNone(self.nsx.get_logical_port(self.port['name']), + msg) + data = self.nsx.get_logical_port(self.port['name']) return data class TestCertificateMgmtOps(TestCertificateMgmt): openstack_tag = 'com.vmware.nsx.openstack' + policy_tag = 'nsx_policy' @decorators.attr(type='nsxv3') @decorators.idempotent_id('6cb32a2b-048a-47a3-b0ed-f6337b81377f') @@ -176,28 +192,30 @@ class TestCertificateMgmtOps(TestCertificateMgmt): time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) self.assertIsNotNone(self.nsxp.get_logical_switch( self.network['name'], self.network['id']), msg) - self.assertIsNotNone(self.nsx.get_logical_switch( - self.network['name'], self.network['id']), msg) - if CONF.network.backend == 'nsxp': data_policy = self.nsxp.get_logical_switch(self.network['name'], self.network['id']) self.assertEqual(data_policy['_create_user'], self.openstack_tag, 'Incorrect tag for the create user') - data = self.nsx.get_logical_switch(self.network['name'], - self.network['id']) - """ - Check if backend shows openstack - as the create user for the object - """ - self.assertEqual(data['_create_user'], self.openstack_tag, - 'Incorrect tag for the create user') - #try to update network name as NSX admin - if CONF.network.backend == 'nsxp': + data = self.nsx.get_logical_switch(self.network['name'], + self.network['id']) + self.assertEqual(data['_create_user'], self.policy_tag, + 'Incorrect tag for the create user') data_policy.update({"display_name": "nsx_modified_switch"}) response = self.nsxp.ca_put_request(component='segments', comp_id=data_policy['id'], body=data_policy) else: + self.assertIsNotNone(self.nsx.get_logical_switch( + self.network['name'], self.network['id']), msg) + data = self.nsx.get_logical_switch(self.network['name'], + self.network['id']) + """ + Check if backend shows openstack + as the create user for the object + """ + self.assertEqual(data['_create_user'], self.openstack_tag, + 'Incorrect tag for the create user') + #try to update network name as NSX admin data.update({"display_name": "nsx_modified_switch"}) response = self.nsx.ca_put_request(component='segments', comp_id=data['id'], @@ -226,16 +244,31 @@ class TestCertificateMgmtOps(TestCertificateMgmt): self.router['id'], subnet_id=self.subnet['id']) #check backend if the router was created msg = 'router %s not found' % self.router['name'] - self.assertIsNotNone(self.nsx.get_logical_router( - self.router['name'], self.router['id']), msg) - data = self.nsx.get_logical_router(self.router['name'], - self.router['id']) - """ - Check if backend shows openstack - as the create user for the object - """ - self.assertEqual(data['_create_user'], self.openstack_tag, - 'Incorrect tag for the create user') + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self.assertIsNotNone(self.nsxp.get_logical_router( + self.router['name'], self.router['id']), msg) + self.assertIsNotNone(self.nsxp.get_logical_router( + self.router['name'], self.router['id']), msg) + data_policy = self.nsxp.get_logical_router(self.router['name'], + self.router['id']) + self.assertEqual(data_policy['_create_user'], self.openstack_tag, + 'Incorrect tag for the create user') + data = self.nsx.get_logical_router(self.router['name'], + self.router['id']) + self.assertEqual(data['_create_user'], self.policy_tag, + 'Incorrect tag for the create user') + else: + self.assertIsNotNone(self.nsx.get_logical_router( + self.router['name'], self.router['id']), msg) + data = self.nsx.get_logical_router(self.router['name'], + self.router['id']) + """ + Check if backend shows openstack + as the create user for the object + """ + self.assertEqual(data['_create_user'], self.openstack_tag, + 'Incorrect tag for the create user') #Obtain any router port corresponding to the logical router rtr_ports = self.nsx.get_logical_router_ports(data) #try to update router name as NSX admin @@ -265,29 +298,63 @@ class TestCertificateMgmtOps(TestCertificateMgmt): self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) #obtain all switching profiles at the backend - qos_policies = self.nsx.get_switching_profiles() - nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies, - policy['name']) - #check backend if the qos policy was created - msg = 'Qos policy %s not found' % policy['name'] - self.assertIsNotNone(self.nsx.get_switching_profile( - nsx_policy['id']), msg) - data = self.nsx.get_switching_profile(nsx_policy['id']) - """ - Check if backend shows openstack - as the create user for the object - """ - self.assertEqual(data['_create_user'], self.openstack_tag, - 'Incorrect tag for the create user') - #try to update qos policy as NSX admin - data.update({"display_name": "nsx_modified_qos-policy"}) - response = self.nsx.ca_put_request(component='switching-profiles', - comp_id=data['id'], body=data) - self.parse_response(response) - #try to delete qos policy as NSX admin - response = self.nsx.ca_delete_request(component='switching-profiles', - comp_id=data['id']) - self.parse_response(response) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + nsx_policy = self.nsxp.get_qos_profile(policy['name'], + policy['id']) + msg = 'Qos policy %s not found' % policy['name'] + self.assertIsNotNone(nsx_policy, msg) + self.assertEqual(nsx_policy['_create_user'], self.openstack_tag, + 'Incorrect tag for the create user') + qos_policies = self.nsx.get_switching_profiles() + nsx_name = policy['name'] + "_" + policy['id'][:5]\ + + "..." + policy['id'][-5:] + nsx_policy_v3 = self.nsx.get_nsx_resource_by_name(qos_policies, + nsx_name) + #check backend if the qos policy was created + msg = 'Qos policy %s not found' % policy['name'] + self.assertIsNotNone(self.nsx.get_switching_profile( + nsx_policy_v3['id']), msg) + data = self.nsx.get_switching_profile(nsx_policy_v3['id']) + """ + Check if backend shows openstack + as the create user for the object + """ + self.assertEqual(data['_create_user'], self.policy_tag, + 'Incorrect tag for the create user') + + data = nsx_policy + data.update({"display_name": "nsx_modified_qos-policy"}) + response = self.nsxp.ca_put_request(component='qos-profiles', + comp_id=data['id'], body=data) + self.parse_response(response) + response = self.nsxp.ca_delete_request(component='qos-profiles', + comp_id=data['id']) + self.parse_response(response) + else: + qos_policies = self.nsx.get_switching_profiles() + nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies, + policy['name']) + #check backend if the qos policy was created + msg = 'Qos policy %s not found' % policy['name'] + self.assertIsNotNone(self.nsx.get_switching_profile( + nsx_policy['id']), msg) + data = self.nsx.get_switching_profile(nsx_policy['id']) + """ + Check if backend shows openstack + as the create user for the object + """ + self.assertEqual(data['_create_user'], self.openstack_tag, + 'Incorrect tag for the create user') + #try to update qos policy as NSX admin + data.update({"display_name": "nsx_modified_qos-policy"}) + response = self.nsx.ca_put_request(component='switching-profiles', + comp_id=data['id'], body=data) + self.parse_response(response) + #try to delete qos policy as NSX admin + response = self.nsx.ca_delete_request( + component='switching-profiles', comp_id=data['id']) + self.parse_response(response) @decorators.attr(type='nsxv3') @decorators.idempotent_id('2b232060-dc42-4b2d-8185-64bd12e46e55') @@ -301,29 +368,60 @@ class TestCertificateMgmtOps(TestCertificateMgmt): self.security_group = self._create_security_group() #check backend if the firewall section was created msg = 'Security group %s not found' % self.security_group['name'] - self.assertIsNotNone(self.nsx.get_firewall_section( - self.security_group['name'], self.security_group['id']), msg) - data = self.nsx.get_firewall_section(self.security_group['name'], - self.security_group['id']) - """ - Check if backend shows openstack - as the create user for the object - """ - self.assertEqual(data['_create_user'], self.openstack_tag, - 'Incorrect tag for the create user') - #obtain firewall rules related to the security group - fw_rules = self.nsx.get_firewall_section_rules(data) - #try to update security group as NSX admin - data.update({"display_name": "nsx_modified_security_group"}) - response = self.nsx.ca_put_request(component='firewall/sections', - comp_id=data['id'], body=data) - self.parse_response(response) - #try to delete logical firewall rule as NSX admin - if len(fw_rules) != 0: - component = 'firewall/sections/' + data['id'] + '/rules' - response = self.nsx.ca_delete_request(component=component, - comp_id=fw_rules[0]['id']) + if CONF.network.backend == 'nsxp': + time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) + self.assertIsNotNone(self.nsxp.get_firewall_section( + self.security_group['name'], self.security_group['id'], + os_tenant_id=self.security_group['tenant_id']), msg) + self.assertIsNotNone(self.nsx.get_firewall_section( + self.security_group['name'], + self.security_group['id'], nsxp=True), msg) + data = self.nsxp.get_firewall_section( + self.security_group['name'], + self.security_group['id'], + os_tenant_id=self.security_group['tenant_id']) + self.assertEqual(data['_create_user'], self.openstack_tag, + 'Incorrect tag for the create user') + fw_rules = self.nsxp.get_firewall_section_rules( + data, + tenant_id=self.security_group['tenant_id']) + data.update({"display_name": "nsx_modified_security_group"}) + response = self.nsxp.ca_put_request( + component='domains/%s/security-policies' % ( + self.security_group['tenant_id']), + comp_id=data['id'], body=data) self.parse_response(response) + if len(fw_rules) != 0: + component = 'domains/%s/security-policies/%s/rules' % ( + self.security_group['tenant_id'], data['id']) + response = self.nsxp.ca_delete_request( + component=component, comp_id=fw_rules[0]['id']) + self.parse_response(response) + + else: + self.assertIsNotNone(self.nsx.get_firewall_section( + self.security_group['name'], self.security_group['id']), msg) + data = self.nsx.get_firewall_section(self.security_group['name'], + self.security_group['id']) + """ + Check if backend shows openstack + as the create user for the object + """ + self.assertEqual(data['_create_user'], self.openstack_tag, + 'Incorrect tag for the create user') + #obtain firewall rules related to the security group + fw_rules = self.nsx.get_firewall_section_rules(data) + #try to update security group as NSX admin + data.update({"display_name": "nsx_modified_security_group"}) + response = self.nsx.ca_put_request(component='firewall/sections', + comp_id=data['id'], body=data) + self.parse_response(response) + #try to delete logical firewall rule as NSX admin + if len(fw_rules) != 0: + component = 'firewall/sections/' + data['id'] + '/rules' + response = self.nsx.ca_delete_request( + component=component, comp_id=fw_rules[0]['id']) + self.parse_response(response) @decorators.attr(type='nsxv3') @decorators.idempotent_id('b10d5ede-d1c7-47a0-9d55-b9aabc8f0af1') @@ -337,17 +435,31 @@ class TestCertificateMgmtOps(TestCertificateMgmt): as the create user for the object """ data = self.ca_topo() + if CONF.network.backend == 'nsxp': + nsx_network = data['nsx_network'] + data = data['nsx_port'] self.assertEqual(data['_create_user'], self.openstack_tag, 'Incorrect tag for the create user') #try to update logical port as NSX admin data.update({"display_name": "nsx_modified_logical_port"}) - response = self.nsx.ca_put_request(component='logical-ports', - comp_id=data['id'], body=data) - self.parse_response(response) - #try to delete logical port as NSX admin - response = self.nsx.ca_delete_request(component='logical-ports', - comp_id=data['id']) - self.parse_response(response) + if CONF.network.backend == 'nsxp': + response = self.nsxp.ca_put_request( + component='segments/%s/ports' % nsx_network['id'], + comp_id=data['id'], body=data) + self.parse_response(response) + #try to delete logical port as NSX admin + response = self.nsxp.ca_delete_request( + component='segments/%s/ports' % nsx_network['id'], + comp_id=data['id']) + self.parse_response(response, ports=True) + else: + response = self.nsx.ca_put_request(component='logical-ports', + comp_id=data['id'], body=data) + self.parse_response(response) + #try to delete logical port as NSX admin + response = self.nsx.ca_delete_request(component='logical-ports', + comp_id=data['id']) + self.parse_response(response) @decorators.attr(type='nsxv3') @decorators.idempotent_id('280cdcc6-5bd0-472c-a8a9-954dd612a0a6') @@ -362,19 +474,28 @@ class TestCertificateMgmtOps(TestCertificateMgmt): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #obtain all switching profiles at the backend - qos_policies = self.nsx.get_switching_profiles() - nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies, - policy['name']) - #check backend if the qos policy was created - msg = 'Qos policy %s not found' % policy['name'] - self.assertIsNotNone(self.nsx.get_switching_profile( - nsx_policy['id']), msg) - data = self.nsx.get_switching_profile(nsx_policy['id']) - #try to delete qos policy as NSX admin - endpoint = ("/%s/%s" % ('switching-profiles', - data['id'])) - response = self.nsx.delete_super_admin(endpoint) + if CONF.network.backend == 'nsxp': + nsx_policy = self.nsxp.get_qos_profile(policy['name'], + policy['id']) + msg = 'Qos policy %s not found' % policy['name'] + self.assertIsNotNone(nsx_policy['id'], msg) + data = nsx_policy + endpoint = ("qos-profiles/%s" % data['id']) + response = self.nsxp.delete_super_admin(endpoint) + else: + #obtain all switching profiles at the backend + qos_policies = self.nsx.get_switching_profiles() + nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies, + policy['name']) + #check backend if the qos policy was created + msg = 'Qos policy %s not found' % policy['name'] + self.assertIsNotNone(self.nsx.get_switching_profile( + nsx_policy['id']), msg) + data = self.nsx.get_switching_profile(nsx_policy['id']) + #try to delete qos policy as NSX admin + endpoint = ("/%s/%s" % ('switching-profiles', + data['id'])) + response = self.nsx.delete_super_admin(endpoint) self.assertEqual(response.status_code, 200, "Superadmin unable to " "delete the qos switching profile")