NSXP policy support for cert cases
Change-Id: I6031b445c3e20f20b4c8d3c13f9a857b68cc2834
This commit is contained in:
parent
4c4d34e231
commit
570833ca32
@ -356,3 +356,46 @@ class NSXPClient(object):
|
||||
endpoint = "/logical-routers/%s/routing/advertisement" % lrouter['id']
|
||||
response = self.get(endpoint)
|
||||
return response.json()
|
||||
|
||||
def get_qos_profiles(self):
|
||||
"""
|
||||
Get all user defined qos-profiles
|
||||
"""
|
||||
endpoint = "qos-profiles"
|
||||
return self.get_logical_resources(endpoint)
|
||||
|
||||
def get_qos_profile(self, os_name, os_uuid):
|
||||
"""
|
||||
Get the qos-profile based on the name and uuid provided.
|
||||
|
||||
The name of the qos-profile should follow
|
||||
<os_network_name>_<first 5 os uuid>...<last 5 os uuid>
|
||||
Return qos-profile if found, otherwise return None
|
||||
"""
|
||||
if not os_name or not os_uuid:
|
||||
LOG.error("Name and uuid of Openstack qos-profile need to be "
|
||||
"present in order to query backend qos-profile!")
|
||||
return None
|
||||
nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:]
|
||||
qos_profile = self.get_qos_profiles()
|
||||
return self.get_nsx_resource_by_name(qos_profile, nsx_name)
|
||||
|
||||
def get_logical_ports(self, nsx_network):
|
||||
"""
|
||||
Retrieve all logical ports of segments on NSX backend
|
||||
"""
|
||||
return self.get_logical_resources(
|
||||
"segments/%s/ports" % nsx_network['id'])
|
||||
|
||||
def get_logical_port(self, os_name, nsx_network):
|
||||
"""
|
||||
Get the logical port based on the os_name provided.
|
||||
The name of the logical port shoud match the os_name.
|
||||
Return the logical port if found, otherwise return None.
|
||||
"""
|
||||
if not os_name:
|
||||
LOG.error("Name of OS port should be present "
|
||||
"in order to query backend logical port created")
|
||||
return None
|
||||
lports = self.get_logical_ports(nsx_network)
|
||||
return self.get_nsx_resource_by_name(lports, os_name)
|
||||
|
@ -118,12 +118,16 @@ class TestCertificateMgmt(manager.NetworkScenarioTest):
|
||||
cls.policies_created.append(policy)
|
||||
return policy
|
||||
|
||||
def parse_response(self, response):
|
||||
def parse_response(self, response, ports=False):
|
||||
"""
|
||||
Parse response from NSX backend to check if NSX
|
||||
is unable to delete or modify openstack entities
|
||||
"""
|
||||
msg = 'Error: NSX admin is able to modify/delete'
|
||||
if ports:
|
||||
self.error_msg = 'cannot be deleted as either it '\
|
||||
'has children or it is being'\
|
||||
' referenced by other objects'
|
||||
if all(x in response.json()['error_message'] for x in self.error_msg):
|
||||
LOG.info('NSX admin is unable to modify/delete '
|
||||
'the openstack object')
|
||||
@ -141,14 +145,26 @@ class TestCertificateMgmt(manager.NetworkScenarioTest):
|
||||
self.port = self._create_port(network_id=self.network['id'],
|
||||
namestart='ca')
|
||||
msg = 'Logical Port %s not found' % self.port['name']
|
||||
self.assertIsNotNone(self.nsx.get_logical_port(self.port['name']),
|
||||
msg)
|
||||
data = self.nsx.get_logical_port(self.port['name'])
|
||||
if CONF.network.backend == 'nsxp':
|
||||
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
||||
nsx_network = self.nsxp.get_logical_switch(self.network['name'],
|
||||
self.network['id'])
|
||||
self.assertIsNotNone(self.nsxp.get_logical_port(self.port['name'],
|
||||
nsx_network),
|
||||
msg)
|
||||
nsx_port = self.nsxp.get_logical_port(self.port['name'],
|
||||
nsx_network)
|
||||
data = {'nsx_network': nsx_network, 'nsx_port': nsx_port}
|
||||
else:
|
||||
self.assertIsNotNone(self.nsx.get_logical_port(self.port['name']),
|
||||
msg)
|
||||
data = self.nsx.get_logical_port(self.port['name'])
|
||||
return data
|
||||
|
||||
|
||||
class TestCertificateMgmtOps(TestCertificateMgmt):
|
||||
openstack_tag = 'com.vmware.nsx.openstack'
|
||||
policy_tag = 'nsx_policy'
|
||||
|
||||
@decorators.attr(type='nsxv3')
|
||||
@decorators.idempotent_id('6cb32a2b-048a-47a3-b0ed-f6337b81377f')
|
||||
@ -176,28 +192,30 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
|
||||
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
||||
self.assertIsNotNone(self.nsxp.get_logical_switch(
|
||||
self.network['name'], self.network['id']), msg)
|
||||
self.assertIsNotNone(self.nsx.get_logical_switch(
|
||||
self.network['name'], self.network['id']), msg)
|
||||
if CONF.network.backend == 'nsxp':
|
||||
data_policy = self.nsxp.get_logical_switch(self.network['name'],
|
||||
self.network['id'])
|
||||
self.assertEqual(data_policy['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
data = self.nsx.get_logical_switch(self.network['name'],
|
||||
self.network['id'])
|
||||
"""
|
||||
Check if backend shows openstack
|
||||
as the create user for the object
|
||||
"""
|
||||
self.assertEqual(data['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
#try to update network name as NSX admin
|
||||
if CONF.network.backend == 'nsxp':
|
||||
data = self.nsx.get_logical_switch(self.network['name'],
|
||||
self.network['id'])
|
||||
self.assertEqual(data['_create_user'], self.policy_tag,
|
||||
'Incorrect tag for the create user')
|
||||
data_policy.update({"display_name": "nsx_modified_switch"})
|
||||
response = self.nsxp.ca_put_request(component='segments',
|
||||
comp_id=data_policy['id'],
|
||||
body=data_policy)
|
||||
else:
|
||||
self.assertIsNotNone(self.nsx.get_logical_switch(
|
||||
self.network['name'], self.network['id']), msg)
|
||||
data = self.nsx.get_logical_switch(self.network['name'],
|
||||
self.network['id'])
|
||||
"""
|
||||
Check if backend shows openstack
|
||||
as the create user for the object
|
||||
"""
|
||||
self.assertEqual(data['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
#try to update network name as NSX admin
|
||||
data.update({"display_name": "nsx_modified_switch"})
|
||||
response = self.nsx.ca_put_request(component='segments',
|
||||
comp_id=data['id'],
|
||||
@ -226,16 +244,31 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
|
||||
self.router['id'], subnet_id=self.subnet['id'])
|
||||
#check backend if the router was created
|
||||
msg = 'router %s not found' % self.router['name']
|
||||
self.assertIsNotNone(self.nsx.get_logical_router(
|
||||
self.router['name'], self.router['id']), msg)
|
||||
data = self.nsx.get_logical_router(self.router['name'],
|
||||
self.router['id'])
|
||||
"""
|
||||
Check if backend shows openstack
|
||||
as the create user for the object
|
||||
"""
|
||||
self.assertEqual(data['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
if CONF.network.backend == 'nsxp':
|
||||
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
||||
self.assertIsNotNone(self.nsxp.get_logical_router(
|
||||
self.router['name'], self.router['id']), msg)
|
||||
self.assertIsNotNone(self.nsxp.get_logical_router(
|
||||
self.router['name'], self.router['id']), msg)
|
||||
data_policy = self.nsxp.get_logical_router(self.router['name'],
|
||||
self.router['id'])
|
||||
self.assertEqual(data_policy['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
data = self.nsx.get_logical_router(self.router['name'],
|
||||
self.router['id'])
|
||||
self.assertEqual(data['_create_user'], self.policy_tag,
|
||||
'Incorrect tag for the create user')
|
||||
else:
|
||||
self.assertIsNotNone(self.nsx.get_logical_router(
|
||||
self.router['name'], self.router['id']), msg)
|
||||
data = self.nsx.get_logical_router(self.router['name'],
|
||||
self.router['id'])
|
||||
"""
|
||||
Check if backend shows openstack
|
||||
as the create user for the object
|
||||
"""
|
||||
self.assertEqual(data['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
#Obtain any router port corresponding to the logical router
|
||||
rtr_ports = self.nsx.get_logical_router_ports(data)
|
||||
#try to update router name as NSX admin
|
||||
@ -265,29 +298,63 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
|
||||
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||
self.adm_qos_client.delete_policy, policy['id'])
|
||||
#obtain all switching profiles at the backend
|
||||
qos_policies = self.nsx.get_switching_profiles()
|
||||
nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies,
|
||||
policy['name'])
|
||||
#check backend if the qos policy was created
|
||||
msg = 'Qos policy %s not found' % policy['name']
|
||||
self.assertIsNotNone(self.nsx.get_switching_profile(
|
||||
nsx_policy['id']), msg)
|
||||
data = self.nsx.get_switching_profile(nsx_policy['id'])
|
||||
"""
|
||||
Check if backend shows openstack
|
||||
as the create user for the object
|
||||
"""
|
||||
self.assertEqual(data['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
#try to update qos policy as NSX admin
|
||||
data.update({"display_name": "nsx_modified_qos-policy"})
|
||||
response = self.nsx.ca_put_request(component='switching-profiles',
|
||||
comp_id=data['id'], body=data)
|
||||
self.parse_response(response)
|
||||
#try to delete qos policy as NSX admin
|
||||
response = self.nsx.ca_delete_request(component='switching-profiles',
|
||||
comp_id=data['id'])
|
||||
self.parse_response(response)
|
||||
if CONF.network.backend == 'nsxp':
|
||||
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
||||
nsx_policy = self.nsxp.get_qos_profile(policy['name'],
|
||||
policy['id'])
|
||||
msg = 'Qos policy %s not found' % policy['name']
|
||||
self.assertIsNotNone(nsx_policy, msg)
|
||||
self.assertEqual(nsx_policy['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
qos_policies = self.nsx.get_switching_profiles()
|
||||
nsx_name = policy['name'] + "_" + policy['id'][:5]\
|
||||
+ "..." + policy['id'][-5:]
|
||||
nsx_policy_v3 = self.nsx.get_nsx_resource_by_name(qos_policies,
|
||||
nsx_name)
|
||||
#check backend if the qos policy was created
|
||||
msg = 'Qos policy %s not found' % policy['name']
|
||||
self.assertIsNotNone(self.nsx.get_switching_profile(
|
||||
nsx_policy_v3['id']), msg)
|
||||
data = self.nsx.get_switching_profile(nsx_policy_v3['id'])
|
||||
"""
|
||||
Check if backend shows openstack
|
||||
as the create user for the object
|
||||
"""
|
||||
self.assertEqual(data['_create_user'], self.policy_tag,
|
||||
'Incorrect tag for the create user')
|
||||
|
||||
data = nsx_policy
|
||||
data.update({"display_name": "nsx_modified_qos-policy"})
|
||||
response = self.nsxp.ca_put_request(component='qos-profiles',
|
||||
comp_id=data['id'], body=data)
|
||||
self.parse_response(response)
|
||||
response = self.nsxp.ca_delete_request(component='qos-profiles',
|
||||
comp_id=data['id'])
|
||||
self.parse_response(response)
|
||||
else:
|
||||
qos_policies = self.nsx.get_switching_profiles()
|
||||
nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies,
|
||||
policy['name'])
|
||||
#check backend if the qos policy was created
|
||||
msg = 'Qos policy %s not found' % policy['name']
|
||||
self.assertIsNotNone(self.nsx.get_switching_profile(
|
||||
nsx_policy['id']), msg)
|
||||
data = self.nsx.get_switching_profile(nsx_policy['id'])
|
||||
"""
|
||||
Check if backend shows openstack
|
||||
as the create user for the object
|
||||
"""
|
||||
self.assertEqual(data['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
#try to update qos policy as NSX admin
|
||||
data.update({"display_name": "nsx_modified_qos-policy"})
|
||||
response = self.nsx.ca_put_request(component='switching-profiles',
|
||||
comp_id=data['id'], body=data)
|
||||
self.parse_response(response)
|
||||
#try to delete qos policy as NSX admin
|
||||
response = self.nsx.ca_delete_request(
|
||||
component='switching-profiles', comp_id=data['id'])
|
||||
self.parse_response(response)
|
||||
|
||||
@decorators.attr(type='nsxv3')
|
||||
@decorators.idempotent_id('2b232060-dc42-4b2d-8185-64bd12e46e55')
|
||||
@ -301,29 +368,60 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
|
||||
self.security_group = self._create_security_group()
|
||||
#check backend if the firewall section was created
|
||||
msg = 'Security group %s not found' % self.security_group['name']
|
||||
self.assertIsNotNone(self.nsx.get_firewall_section(
|
||||
self.security_group['name'], self.security_group['id']), msg)
|
||||
data = self.nsx.get_firewall_section(self.security_group['name'],
|
||||
self.security_group['id'])
|
||||
"""
|
||||
Check if backend shows openstack
|
||||
as the create user for the object
|
||||
"""
|
||||
self.assertEqual(data['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
#obtain firewall rules related to the security group
|
||||
fw_rules = self.nsx.get_firewall_section_rules(data)
|
||||
#try to update security group as NSX admin
|
||||
data.update({"display_name": "nsx_modified_security_group"})
|
||||
response = self.nsx.ca_put_request(component='firewall/sections',
|
||||
comp_id=data['id'], body=data)
|
||||
self.parse_response(response)
|
||||
#try to delete logical firewall rule as NSX admin
|
||||
if len(fw_rules) != 0:
|
||||
component = 'firewall/sections/' + data['id'] + '/rules'
|
||||
response = self.nsx.ca_delete_request(component=component,
|
||||
comp_id=fw_rules[0]['id'])
|
||||
if CONF.network.backend == 'nsxp':
|
||||
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
||||
self.assertIsNotNone(self.nsxp.get_firewall_section(
|
||||
self.security_group['name'], self.security_group['id'],
|
||||
os_tenant_id=self.security_group['tenant_id']), msg)
|
||||
self.assertIsNotNone(self.nsx.get_firewall_section(
|
||||
self.security_group['name'],
|
||||
self.security_group['id'], nsxp=True), msg)
|
||||
data = self.nsxp.get_firewall_section(
|
||||
self.security_group['name'],
|
||||
self.security_group['id'],
|
||||
os_tenant_id=self.security_group['tenant_id'])
|
||||
self.assertEqual(data['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
fw_rules = self.nsxp.get_firewall_section_rules(
|
||||
data,
|
||||
tenant_id=self.security_group['tenant_id'])
|
||||
data.update({"display_name": "nsx_modified_security_group"})
|
||||
response = self.nsxp.ca_put_request(
|
||||
component='domains/%s/security-policies' % (
|
||||
self.security_group['tenant_id']),
|
||||
comp_id=data['id'], body=data)
|
||||
self.parse_response(response)
|
||||
if len(fw_rules) != 0:
|
||||
component = 'domains/%s/security-policies/%s/rules' % (
|
||||
self.security_group['tenant_id'], data['id'])
|
||||
response = self.nsxp.ca_delete_request(
|
||||
component=component, comp_id=fw_rules[0]['id'])
|
||||
self.parse_response(response)
|
||||
|
||||
else:
|
||||
self.assertIsNotNone(self.nsx.get_firewall_section(
|
||||
self.security_group['name'], self.security_group['id']), msg)
|
||||
data = self.nsx.get_firewall_section(self.security_group['name'],
|
||||
self.security_group['id'])
|
||||
"""
|
||||
Check if backend shows openstack
|
||||
as the create user for the object
|
||||
"""
|
||||
self.assertEqual(data['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
#obtain firewall rules related to the security group
|
||||
fw_rules = self.nsx.get_firewall_section_rules(data)
|
||||
#try to update security group as NSX admin
|
||||
data.update({"display_name": "nsx_modified_security_group"})
|
||||
response = self.nsx.ca_put_request(component='firewall/sections',
|
||||
comp_id=data['id'], body=data)
|
||||
self.parse_response(response)
|
||||
#try to delete logical firewall rule as NSX admin
|
||||
if len(fw_rules) != 0:
|
||||
component = 'firewall/sections/' + data['id'] + '/rules'
|
||||
response = self.nsx.ca_delete_request(
|
||||
component=component, comp_id=fw_rules[0]['id'])
|
||||
self.parse_response(response)
|
||||
|
||||
@decorators.attr(type='nsxv3')
|
||||
@decorators.idempotent_id('b10d5ede-d1c7-47a0-9d55-b9aabc8f0af1')
|
||||
@ -337,17 +435,31 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
|
||||
as the create user for the object
|
||||
"""
|
||||
data = self.ca_topo()
|
||||
if CONF.network.backend == 'nsxp':
|
||||
nsx_network = data['nsx_network']
|
||||
data = data['nsx_port']
|
||||
self.assertEqual(data['_create_user'], self.openstack_tag,
|
||||
'Incorrect tag for the create user')
|
||||
#try to update logical port as NSX admin
|
||||
data.update({"display_name": "nsx_modified_logical_port"})
|
||||
response = self.nsx.ca_put_request(component='logical-ports',
|
||||
comp_id=data['id'], body=data)
|
||||
self.parse_response(response)
|
||||
#try to delete logical port as NSX admin
|
||||
response = self.nsx.ca_delete_request(component='logical-ports',
|
||||
comp_id=data['id'])
|
||||
self.parse_response(response)
|
||||
if CONF.network.backend == 'nsxp':
|
||||
response = self.nsxp.ca_put_request(
|
||||
component='segments/%s/ports' % nsx_network['id'],
|
||||
comp_id=data['id'], body=data)
|
||||
self.parse_response(response)
|
||||
#try to delete logical port as NSX admin
|
||||
response = self.nsxp.ca_delete_request(
|
||||
component='segments/%s/ports' % nsx_network['id'],
|
||||
comp_id=data['id'])
|
||||
self.parse_response(response, ports=True)
|
||||
else:
|
||||
response = self.nsx.ca_put_request(component='logical-ports',
|
||||
comp_id=data['id'], body=data)
|
||||
self.parse_response(response)
|
||||
#try to delete logical port as NSX admin
|
||||
response = self.nsx.ca_delete_request(component='logical-ports',
|
||||
comp_id=data['id'])
|
||||
self.parse_response(response)
|
||||
|
||||
@decorators.attr(type='nsxv3')
|
||||
@decorators.idempotent_id('280cdcc6-5bd0-472c-a8a9-954dd612a0a6')
|
||||
@ -362,19 +474,28 @@ class TestCertificateMgmtOps(TestCertificateMgmt):
|
||||
shared=False)
|
||||
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||
self.adm_qos_client.delete_policy, policy['id'])
|
||||
#obtain all switching profiles at the backend
|
||||
qos_policies = self.nsx.get_switching_profiles()
|
||||
nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies,
|
||||
policy['name'])
|
||||
#check backend if the qos policy was created
|
||||
msg = 'Qos policy %s not found' % policy['name']
|
||||
self.assertIsNotNone(self.nsx.get_switching_profile(
|
||||
nsx_policy['id']), msg)
|
||||
data = self.nsx.get_switching_profile(nsx_policy['id'])
|
||||
#try to delete qos policy as NSX admin
|
||||
endpoint = ("/%s/%s" % ('switching-profiles',
|
||||
data['id']))
|
||||
response = self.nsx.delete_super_admin(endpoint)
|
||||
if CONF.network.backend == 'nsxp':
|
||||
nsx_policy = self.nsxp.get_qos_profile(policy['name'],
|
||||
policy['id'])
|
||||
msg = 'Qos policy %s not found' % policy['name']
|
||||
self.assertIsNotNone(nsx_policy['id'], msg)
|
||||
data = nsx_policy
|
||||
endpoint = ("qos-profiles/%s" % data['id'])
|
||||
response = self.nsxp.delete_super_admin(endpoint)
|
||||
else:
|
||||
#obtain all switching profiles at the backend
|
||||
qos_policies = self.nsx.get_switching_profiles()
|
||||
nsx_policy = self.nsx.get_nsx_resource_by_name(qos_policies,
|
||||
policy['name'])
|
||||
#check backend if the qos policy was created
|
||||
msg = 'Qos policy %s not found' % policy['name']
|
||||
self.assertIsNotNone(self.nsx.get_switching_profile(
|
||||
nsx_policy['id']), msg)
|
||||
data = self.nsx.get_switching_profile(nsx_policy['id'])
|
||||
#try to delete qos policy as NSX admin
|
||||
endpoint = ("/%s/%s" % ('switching-profiles',
|
||||
data['id']))
|
||||
response = self.nsx.delete_super_admin(endpoint)
|
||||
self.assertEqual(response.status_code, 200,
|
||||
"Superadmin unable to "
|
||||
"delete the qos switching profile")
|
||||
|
Loading…
x
Reference in New Issue
Block a user