[Policy] Migrating QoS API tests to policy

Change-Id: I94bc76a3a5be7a23cf5d17d5325ab253b8160ad8
This commit is contained in:
Deepthi Kandavara Jayarama 2019-04-04 01:37:00 +00:00
parent 586361584d
commit 3ccb55b701
3 changed files with 284 additions and 19 deletions

View File

@ -399,3 +399,18 @@ class NSXPClient(object):
return None
lports = self.get_logical_ports(nsx_network)
return self.get_nsx_resource_by_name(lports, os_name)
def get_port_qos_profile_binding_map(self, segment_id, port_id):
"""
Get the qos profile associated with the port.
Return qos profile id if found, otherwise return None
"""
if not segment_id or not port_id:
LOG.error("segment id and port id need to be "
"present in order to query backend port QoS Profiles!")
return None
endpoint = "segments/%s/ports/%s/port-qos-profile-binding-maps" % (
segment_id, port_id)
response = self.get(endpoint)
return response

View File

@ -304,12 +304,6 @@ class NSXV3Client(object):
"""
return self.get_logical_resources("/logical-switches")
def get_logical_switch_profiles(self):
"""
Retrieve all switching profiles on NSX backend
"""
return self.get_logical_resources("/switching-profiles")
def get_switching_profiles(self):
"""
Retrieve all switching profiles on NSX backend
@ -671,3 +665,19 @@ class NSXV3Client(object):
:return: returns list of health monitors information.
"""
return self.get_logical_resources("/loadbalancer/monitors")
def get_qos_profile(self, os_name, os_uuid):
"""
Get the qos profile based on the name and uuid provided.
The name of the qos profile should follow
<os_qos_policy_name>_<first 5 os uuid>...<last 5 os uuid>
Return qos profile if found, otherwise return None
"""
if not os_name or not os_uuid:
LOG.error("Name and uuid of OpenStack QoS policy need to be "
"present in order to query backend QoS Profiles!")
return None
nsx_name = os_name + "_" + os_uuid[:5] + "..." + os_uuid[-5:]
qos_profiles = self.get_switching_profiles()
return self.get_nsx_resource_by_name(qos_profiles, nsx_name)

View File

@ -14,6 +14,7 @@
# under the License.
import testtools
import time
from tempest.api.network import base
from tempest import config
@ -23,6 +24,9 @@ from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client
from vmware_nsx_tempest_plugin.services.qos import base_qos
CONF = config.CONF
@ -56,6 +60,12 @@ class BaseQosTest(base.BaseAdminNetworkTest):
cls.qos_available_rule_types = (
cls.adm_qos_client.available_rule_types())
cls.policies_created = []
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
@classmethod
def resource_cleanup(cls):
@ -165,6 +175,74 @@ class BaseQosTest(base.BaseAdminNetworkTest):
policy_id, dscp_mark, **kwargs)
return rule
def verify_backend(self, policy):
"""Verify backend NSXT for the policy created."""
#check backend if the policy was created
msg = 'QoS Policy %s not found' % policy['name']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self.assertIsNotNone(self.nsxp.get_qos_profile(
policy['name'], policy['id']), msg)
#Checking the MP backend for qos profiles
self.assertIsNotNone(self.nsx.get_qos_profile(
policy['name'], policy['id']), msg)
def verify_backend_port(self, policy, network, port):
"""Verify backend NSXT port is updated with qos policy."""
#check backend if the policy was created
msg = 'QoS Policy %s not attached to the port' % policy['name']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
segment = self.nsxp.get_logical_switch(network['name'], network['id'])
self.assertEqual(policy['id'],
self.nsxp.get_port_qos_profile_binding_map(
segment['id'], port['id']), msg)
def verify_backend_bandwidth_rule(self, policy, rule):
"""Verify backend NSXT for the rule created."""
#check backend if the rule was created
msg = 'QoS Rule %s not found' % rule['id']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
rule_backend = self.nsxp.get_qos_profile(policy['name'], policy['id'])
self.assertEqual((rule['max_kbps'] / 1000) * 2,
rule_backend['shaper_configurations'][0]['peak_bandwidth'], msg)
self.assertEqual((rule['max_kbps'] / 1000),
rule_backend['shaper_configurations'][0]['average_bandwidth'], msg)
if rule['direction'] == 'egress':
self.assertEqual('IngressRateLimiter',
rule_backend['shaper_configurations'][0]['resource_type'], msg)
else:
self.assertEqual('EgressRateLimiter',
rule_backend['shaper_configurations'][0]['resource_type'], msg)
def verify_backend_bandwidth_rules(self, policy, rule, index):
"""Verify backend NSXT for the rule created."""
#check backend if the rule was created
msg = 'QoS Rule %s not found' % rule['id']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
rules_backend = self.nsxp.get_qos_profile(policy['name'], policy['id'])
self.assertEqual((rule['max_kbps'] / 1000) * 2,
rules_backend['shaper_configurations'][index]['peak_bandwidth'],
msg)
self.assertEqual((rule['max_kbps'] / 1000),
rules_backend['shaper_configurations'][index]['average_bandwidth'],
msg)
if rule['direction'] == 'egress':
self.assertEqual('IngressRateLimiter',
rules_backend['shaper_configurations'][index]['resource_type'],
msg)
else:
self.assertEqual('EgressRateLimiter',
rules_backend['shaper_configurations'][index]['resource_type'],
msg)
def verify_backend_dscp_rule(self, policy, rule):
"""Verify backend NSXT for the rule created."""
#check backend if the rule was created
msg = 'QoS Rule %s not found' % rule['id']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
rule_backend = self.nsxp.get_qos_profile(policy['name'], policy['id'])
self.assertEqual(rule['dscp_mark'], rule_backend['dscp']['priority'],
msg)
class QosPolicyTest(BaseQosTest):
"""QoS Policy CURD operations.
@ -187,7 +265,9 @@ class QosPolicyTest(BaseQosTest):
self.assertEqual('test policy desc1',
retrieved_policy['description'])
self.assertFalse(retrieved_policy['shared'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
# Test 'list policies'
policies = self.adm_qos_client.list_policies()
policies_ids = [p['id'] for p in policies]
@ -202,10 +282,16 @@ class QosPolicyTest(BaseQosTest):
name=name1, description='test policy', shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy_name1['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy_name1)
policy_name2 = self.create_qos_policy(
name=name2, description='test policy', shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy_name2['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy_name2)
policies = self.adm_qos_client.list_policies(name=name1)
self.assertEqual(1, len(policies))
@ -223,6 +309,9 @@ class QosPolicyTest(BaseQosTest):
self.adm_qos_client.update_policy(policy['id'],
description='test policy desc2',
shared=True)
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
retrieved_policy = self.adm_qos_client.show_policy(policy['id'])
self.assertEqual('test policy desc2',
@ -238,6 +327,9 @@ class QosPolicyTest(BaseQosTest):
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
retrieved_policy = self.adm_qos_client.show_policy(policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
self.assertEqual('test-policy', retrieved_policy['name'])
self.adm_qos_client.delete_policy(policy['id'])
@ -293,7 +385,10 @@ class QosPolicyTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
network = self.create_shared_network('test network',
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
network = self.create_shared_network('test-network',
qos_policy_id=policy['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_network, network['id'])
@ -310,7 +405,10 @@ class QosPolicyTest(BaseQosTest):
shared=True)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
network = self.create_network('test network',
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
network = self.create_network('test-network',
client_mgr=self.primary_mgr,
qos_policy_id=policy['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
@ -329,7 +427,7 @@ class QosPolicyTest(BaseQosTest):
self.assertRaises(
exceptions.NotFound,
self.create_network,
'test network',
'test-network',
qos_policy_id='9efe63d0-836f-4cc2-b00c-468e63aa614e')
@decorators.attr(type='negative')
@ -342,7 +440,7 @@ class QosPolicyTest(BaseQosTest):
self.assertRaises(
exceptions.NotFound,
self.create_network,
'test network', qos_policy_id=policy['id'],
'test-network', qos_policy_id=policy['id'],
client_mgr=self.primary_mgr)
@decorators.idempotent_id('10a9392c-1359-4cbb-989f-fb768e5834a8')
@ -353,7 +451,10 @@ class QosPolicyTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
network = self.create_shared_network('test network')
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
network = self.create_shared_network('test-network')
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_network, network['id'])
retrieved_network = self.show_network(network['id'])
@ -384,11 +485,17 @@ class QosPolicyTest(BaseQosTest):
shared=True)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
network = self.create_shared_network('test network')
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
network = self.create_shared_network('test-network')
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_network, network['id'])
port = self.create_port(network, qos_policy_id=policy['id'],
client_mgr=self.primary_mgr)
#check backend if the port qos profile is updated
if CONF.network.backend == 'nsxp':
self.verify_backend_port(policy, network, port)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_port, port['id'])
retrieved_port = self.show_port(port['id'],
@ -403,7 +510,7 @@ class QosPolicyTest(BaseQosTest):
@decorators.idempotent_id('49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
def test_policy_association_with_port_nonexistent_policy(self):
"""test port cannot be created with nonexist policy."""
network = self.create_shared_network('test network')
network = self.create_shared_network('test-network')
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_network, network['id'])
self.assertRaises(
@ -422,7 +529,10 @@ class QosPolicyTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
network = self.create_shared_network('test network')
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
network = self.create_shared_network('test-network')
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_network, network['id'])
self.assertRaises(
@ -440,7 +550,10 @@ class QosPolicyTest(BaseQosTest):
shared=True)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
network = self.create_shared_network('test network')
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
network = self.create_shared_network('test-network')
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_network, network['id'])
port = self.create_port(network, client_mgr=self.primary_mgr)
@ -454,6 +567,9 @@ class QosPolicyTest(BaseQosTest):
client_mgr=self.primary_mgr)
retrieved_port = self.show_port(port['id'],
client_mgr=self.primary_mgr)
#check backend if the port qos profile is updated
if CONF.network.backend == 'nsxp':
self.verify_backend_port(policy, network, port)
self.assertEqual(
policy['id'], retrieved_port['qos_policy_id'])
@ -468,8 +584,11 @@ class QosPolicyTest(BaseQosTest):
shared=True)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
network = self.create_shared_network(
'test network', qos_policy_id=policy['id'])
'test-network', qos_policy_id=policy['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_network, network['id'])
self.assertRaises(
@ -489,13 +608,19 @@ class QosPolicyTest(BaseQosTest):
shared=True)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
network = self.create_shared_network('test network')
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
network = self.create_shared_network('test-network')
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_network, network['id'])
port = self.create_port(network, qos_policy_id=policy['id'],
client_mgr=self.primary_mgr)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_port, port['id'])
#check backend if the port qos profile is updated
if CONF.network.backend == 'nsxp':
self.verify_backend_port(policy, network, port)
self.assertRaises(
exceptions.Conflict,
self.adm_qos_client.delete_policy, policy['id'])
@ -511,6 +636,9 @@ class QosPolicyTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
self.adm_qos_client.create_bandwidth_limit_rule(
policy['id'], 2000, 1337)
@ -532,12 +660,18 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule(
policy_id=policy['id'], max_kbps=2000, max_burst_kbps=1337)
# Test 'show rule'
retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id'])
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, rule)
self.assertEqual(rule['id'], retrieved_rule['id'])
self.assertEqual(2000, retrieved_rule['max_kbps'])
self.assertEqual(1337, retrieved_rule['max_burst_kbps'])
@ -563,6 +697,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule(
policy_id=policy['id'], max_kbps=2000, max_burst_kbps=1337,
direction='ingress')
@ -570,6 +707,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
# Test 'show rule'
retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id'])
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, rule)
self.assertEqual(rule['id'], retrieved_rule['id'])
self.assertEqual(2000, retrieved_rule['max_kbps'])
self.assertEqual(1337, retrieved_rule['max_burst_kbps'])
@ -597,12 +737,18 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
egress_rule = self.create_qos_bandwidth_limit_rule(
policy_id=policy['id'], max_kbps=2000, max_burst_kbps=1337,
direction='egress')
ingress_rule = self.create_qos_bandwidth_limit_rule(
policy_id=policy['id'], max_kbps=2000, max_burst_kbps=1337,
direction='ingress')
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rules(policy, ingress_rule, 1)
# Test 'show rule'
retrieved_rule = qos_client.show_bandwidth_limit_rule(
egress_rule['id'], policy['id'])
@ -650,6 +796,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule(
policy_id=policy['id'], max_kbps=2000, max_burst_kbps=1000)
@ -659,6 +808,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id'])
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, retrieved_rule)
self.assertEqual(max_kbps, retrieved_rule['max_kbps'])
self.assertEqual(max_burst_kbps, retrieved_rule['max_burst_kbps'])
@ -673,6 +825,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule(
policy_id=policy['id'], max_kbps=2000, max_burst_kbps=1000,
direction='ingress')
@ -683,6 +838,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id'])
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, retrieved_rule)
self.assertEqual(max_kbps, retrieved_rule['max_kbps'])
self.assertEqual(max_burst_kbps, retrieved_rule['max_burst_kbps'])
self.assertEqual('ingress', retrieved_rule['direction'])
@ -696,6 +854,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule(
policy_id=policy['id'], max_kbps=2000, max_burst_kbps=1000,
direction='ingress')
@ -706,6 +867,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id'])
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, retrieved_rule)
self.assertEqual(2000, retrieved_rule['max_kbps'])
self.assertEqual(1000, retrieved_rule['max_burst_kbps'])
self.assertEqual('egress', retrieved_rule['direction'])
@ -721,6 +885,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule(
policy['id'],
max_kbps=max_kbps, max_burst_kbps=max_burst_kbps)
@ -728,6 +895,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id'])
self.assertEqual(rule['id'], retrieved_rule['id'])
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, retrieved_rule)
qos_client.delete_bandwidth_limit_rule(
rule['id'], policy['id'])
@ -746,6 +916,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule(
policy['id'],
max_kbps=max_kbps, max_burst_kbps=max_burst_kbps,
@ -754,6 +927,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id'])
self.assertEqual(rule['id'], retrieved_rule['id'])
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, retrieved_rule)
qos_client.delete_bandwidth_limit_rule(
rule['id'], policy['id'])
@ -772,6 +948,9 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
egress_rule = self.create_qos_bandwidth_limit_rule(
policy['id'],
max_kbps=max_kbps, max_burst_kbps=max_burst_kbps,
@ -786,6 +965,14 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
ingress_retrieved_rule = qos_client.show_bandwidth_limit_rule(
ingress_rule['id'], policy['id'])
self.assertEqual(ingress_rule['id'], ingress_retrieved_rule['id'])
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rules(policy,
egress_retrieved_rule, 0)
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rules(policy,
ingress_retrieved_rule, 1)
qos_client.delete_bandwidth_limit_rule(
egress_rule['id'], policy['id'])
self.assertRaises(exceptions.NotFound,
@ -834,16 +1021,28 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy1['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy1)
rule1 = self.create_qos_bandwidth_limit_rule(
policy_id=policy1['id'], max_kbps=2000, max_burst_kbps=1337)
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy1, rule1)
policy2 = self.create_qos_policy(name='test-policy2',
description='test policy2',
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy2['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy2)
rule2 = self.create_qos_bandwidth_limit_rule(
policy_id=policy2['id'], max_kbps=5000, max_burst_kbps=2523)
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy2, rule2)
# Test 'list rules'
rules = self.adm_qos_client.list_bandwidth_limit_rules(policy1['id'])
@ -867,9 +1066,16 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1)
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy, rule)
# Test 'show rule'
retrieved_rule = qos_client.show_dscp_marking_rule(
rule['id'], policy['id'])
@ -898,8 +1104,14 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
self.create_qos_dscp_marking_rule(
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1)
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy, rule)
self.assertRaises(exceptions.Conflict,
self.create_qos_dscp_marking_rule,
@ -915,6 +1127,9 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1)
@ -923,6 +1138,9 @@ class QosDscpMarkingRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_dscp_marking_rule(
rule['id'], policy['id'])
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy, retrieved_rule)
self.assertEqual(self.VALID_DSCP_MARK2, retrieved_rule['dscp_mark'])
@decorators.idempotent_id('67ed6efd-7b33-4a68-927d-275b4f8ba958')
@ -934,9 +1152,16 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
rule = self.create_qos_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1)
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy, rule)
retrieved_rule = qos_client.show_dscp_marking_rule(
rule['id'], policy['id'])
self.assertEqual(rule['id'], retrieved_rule['id'])
@ -974,6 +1199,9 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy)
self.assertRaises(
exceptions.BadRequest,
self.create_qos_dscp_marking_rule,
@ -987,16 +1215,28 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy1['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy1)
rule1 = self.create_qos_dscp_marking_rule(
policy1['id'], self.VALID_DSCP_MARK1)
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy1, rule1)
policy2 = self.create_qos_policy(name='test-policy2',
description='test policy2',
shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy2['id'])
#check backend if the policy was created
if CONF.network.backend == 'nsxp':
self.verify_backend(policy2)
rule2 = self.create_qos_dscp_marking_rule(
policy2['id'], self.VALID_DSCP_MARK2)
#check backend if the rule was created
if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy2, rule2)
# Test 'list rules'
rules = self.adm_qos_client.list_dscp_marking_rules(policy1['id'])