Bug fixes

- bug id : 2399762
	- Added fix to deserialize json data
- bug id : 2399816
        - Added fix to avoid lb creation in resource setup, and create in each testcase
- bug id : 2399954
        - Added fix to avoid lb creation in resource setup, and create in each testcase

Change-Id: I413301b67cdec30320ea6531f757017cf57fa747
This commit is contained in:
shubhamk 2019-08-08 10:16:46 +00:00
parent b3eb03643e
commit b0ff86bb71
4 changed files with 226 additions and 113 deletions

View File

@ -423,7 +423,7 @@ class NSXPClient(object):
endpoint = "segments/%s/ports/%s/port-qos-profile-binding-maps" % ( endpoint = "segments/%s/ports/%s/port-qos-profile-binding-maps" % (
segment_id, port_id) segment_id, port_id)
response = self.get(endpoint) response = self.get(endpoint)
return response return response.json()['results']
def get_neutron_ns_group_id(self): def get_neutron_ns_group_id(self):
""" """

View File

@ -178,71 +178,81 @@ class BaseQosTest(base.BaseAdminNetworkTest):
def verify_backend(self, policy): def verify_backend(self, policy):
"""Verify backend NSXT for the policy created.""" """Verify backend NSXT for the policy created."""
#check backend if the policy was created # check backend if the policy was created
msg = 'QoS Policy %s not found' % policy['name'] msg = 'QoS Policy %s not found' % policy['name']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self.assertIsNotNone(self.nsxp.get_qos_profile( self.assertIsNotNone(self.nsxp.get_qos_profile(
policy['name'], policy['id']), msg) policy['name'], policy['id']), msg)
#Checking the MP backend for qos profiles # Checking the MP backend for qos profiles
self.assertIsNotNone(self.nsx.get_qos_profile( self.assertIsNotNone(self.nsx.get_qos_profile(
policy['name'], policy['id']), msg) policy['name'], policy['id']), msg)
def verify_backend_port(self, policy, network, port): def verify_backend_port(self, policy, network, port):
"""Verify backend NSXT port is updated with qos policy.""" """Verify backend NSXT port is updated with qos policy."""
#check backend if the policy was created # check backend if the policy was created
msg = 'QoS Policy %s not attached to the port' % policy['name'] msg = 'QoS Policy %s not attached to the port' % policy['name']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
segment = self.nsxp.get_logical_switch(network['name'], network['id']) segment = self.nsxp.get_logical_switch(network['name'], network['id'])
qos_policy = self.nsxp.get_port_qos_profile_binding_map(
segment['id'], port['id'])[0]['qos_profile_path'].split('/')
self.assertEqual(policy['id'], self.assertEqual(policy['id'],
self.nsxp.get_port_qos_profile_binding_map( qos_policy[len(qos_policy) - 1], msg)
segment['id'], port['id']), msg)
def verify_backend_bandwidth_rule(self, policy, rule): def verify_backend_bandwidth_rule(self, policy, rule):
"""Verify backend NSXT for the rule created.""" """Verify backend NSXT for the rule created."""
#check backend if the rule was created # check backend if the rule was created
msg = 'QoS Rule %s not found' % rule['id'] msg = 'QoS Rule %s not found' % rule['id']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
rule_backend = self.nsxp.get_qos_profile(policy['name'], policy['id']) rule_backend = self.nsxp.get_qos_profile(policy['name'], policy['id'])
self.assertEqual((rule['max_kbps'] / 1000) * 2, self.assertEqual(
rule_backend['shaper_configurations'][0]['peak_bandwidth'], msg) (rule['max_kbps'] / 1000) * 2,
self.assertEqual((rule['max_kbps'] / 1000), rule_backend['shaper_configurations'][0]['peak_bandwidth'],
msg)
self.assertEqual(
(rule['max_kbps'] / 1000),
rule_backend['shaper_configurations'][0]['average_bandwidth'], msg) rule_backend['shaper_configurations'][0]['average_bandwidth'], msg)
if rule['direction'] == 'egress': if rule['direction'] == 'egress':
self.assertEqual('IngressRateLimiter', self.assertEqual(
'IngressRateLimiter',
rule_backend['shaper_configurations'][0]['resource_type'], msg) rule_backend['shaper_configurations'][0]['resource_type'], msg)
else: else:
self.assertEqual('EgressRateLimiter', self.assertEqual(
'EgressRateLimiter',
rule_backend['shaper_configurations'][0]['resource_type'], msg) rule_backend['shaper_configurations'][0]['resource_type'], msg)
def verify_backend_bandwidth_rules(self, policy, rule, index): def verify_backend_bandwidth_rules(self, policy, rule, index):
"""Verify backend NSXT for the rule created.""" """Verify backend NSXT for the rule created."""
#check backend if the rule was created # check backend if the rule was created
msg = 'QoS Rule %s not found' % rule['id'] msg = 'QoS Rule %s not found' % rule['id']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
rules_backend = self.nsxp.get_qos_profile(policy['name'], policy['id']) rules_backend = self.nsxp.get_qos_profile(policy['name'], policy['id'])
self.assertEqual((rule['max_kbps'] / 1000) * 2, self.assertEqual(
(rule['max_kbps'] / 1000) * 2,
rules_backend['shaper_configurations'][index]['peak_bandwidth'], rules_backend['shaper_configurations'][index]['peak_bandwidth'],
msg) msg)
self.assertEqual((rule['max_kbps'] / 1000), self.assertEqual(
(rule['max_kbps'] / 1000),
rules_backend['shaper_configurations'][index]['average_bandwidth'], rules_backend['shaper_configurations'][index]['average_bandwidth'],
msg) msg)
if rule['direction'] == 'egress': if rule['direction'] == 'egress':
self.assertEqual('IngressRateLimiter', self.assertEqual(
'IngressRateLimiter',
rules_backend['shaper_configurations'][index]['resource_type'], rules_backend['shaper_configurations'][index]['resource_type'],
msg) msg)
else: else:
self.assertEqual('EgressRateLimiter', self.assertEqual(
'EgressRateLimiter',
rules_backend['shaper_configurations'][index]['resource_type'], rules_backend['shaper_configurations'][index]['resource_type'],
msg) msg)
def verify_backend_dscp_rule(self, policy, rule): def verify_backend_dscp_rule(self, policy, rule):
"""Verify backend NSXT for the rule created.""" """Verify backend NSXT for the rule created."""
#check backend if the rule was created # check backend if the rule was created
msg = 'QoS Rule %s not found' % rule['id'] msg = 'QoS Rule %s not found' % rule['id']
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
rule_backend = self.nsxp.get_qos_profile(policy['name'], policy['id']) rule_backend = self.nsxp.get_qos_profile(policy['name'], policy['id'])
self.assertEqual(rule['dscp_mark'], rule_backend['dscp']['priority'], self.assertEqual(rule['dscp_mark'], rule_backend['dscp']['priority'],
msg) msg)
class QosPolicyTest(BaseQosTest): class QosPolicyTest(BaseQosTest):
@ -266,7 +276,7 @@ class QosPolicyTest(BaseQosTest):
self.assertEqual('test policy desc1', self.assertEqual('test policy desc1',
retrieved_policy['description']) retrieved_policy['description'])
self.assertFalse(retrieved_policy['shared']) self.assertFalse(retrieved_policy['shared'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
# Test 'list policies' # Test 'list policies'
@ -283,14 +293,14 @@ class QosPolicyTest(BaseQosTest):
name=name1, description='test policy', shared=False) name=name1, description='test policy', shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy_name1['id']) self.adm_qos_client.delete_policy, policy_name1['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy_name1) self.verify_backend(policy_name1)
policy_name2 = self.create_qos_policy( policy_name2 = self.create_qos_policy(
name=name2, description='test policy', shared=False) name=name2, description='test policy', shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy_name2['id']) self.adm_qos_client.delete_policy, policy_name2['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy_name2) self.verify_backend(policy_name2)
policies = self.adm_qos_client.list_policies(name=name1) policies = self.adm_qos_client.list_policies(name=name1)
@ -310,7 +320,7 @@ class QosPolicyTest(BaseQosTest):
self.adm_qos_client.update_policy(policy['id'], self.adm_qos_client.update_policy(policy['id'],
description='test policy desc2', description='test policy desc2',
shared=True) shared=True)
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
@ -328,7 +338,7 @@ class QosPolicyTest(BaseQosTest):
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
retrieved_policy = self.adm_qos_client.show_policy(policy['id']) retrieved_policy = self.adm_qos_client.show_policy(policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
self.assertEqual('test-policy', retrieved_policy['name']) self.assertEqual('test-policy', retrieved_policy['name'])
@ -386,7 +396,7 @@ class QosPolicyTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
network = self.create_shared_network('test-network', network = self.create_shared_network('test-network',
@ -406,7 +416,7 @@ class QosPolicyTest(BaseQosTest):
shared=True) shared=True)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
network = self.create_network('test-network', network = self.create_network('test-network',
@ -452,7 +462,7 @@ class QosPolicyTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
network = self.create_shared_network('test-network') network = self.create_shared_network('test-network')
@ -486,7 +496,7 @@ class QosPolicyTest(BaseQosTest):
shared=True) shared=True)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
network = self.create_shared_network('test-network') network = self.create_shared_network('test-network')
@ -494,7 +504,7 @@ class QosPolicyTest(BaseQosTest):
self.delete_network, network['id']) self.delete_network, network['id'])
port = self.create_port(network, qos_policy_id=policy['id'], port = self.create_port(network, qos_policy_id=policy['id'],
client_mgr=self.primary_mgr) client_mgr=self.primary_mgr)
#check backend if the port qos profile is updated # check backend if the port qos profile is updated
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_port(policy, network, port) self.verify_backend_port(policy, network, port)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
@ -530,7 +540,7 @@ class QosPolicyTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
network = self.create_shared_network('test-network') network = self.create_shared_network('test-network')
@ -551,7 +561,7 @@ class QosPolicyTest(BaseQosTest):
shared=True) shared=True)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
network = self.create_shared_network('test-network') network = self.create_shared_network('test-network')
@ -568,7 +578,7 @@ class QosPolicyTest(BaseQosTest):
client_mgr=self.primary_mgr) client_mgr=self.primary_mgr)
retrieved_port = self.show_port(port['id'], retrieved_port = self.show_port(port['id'],
client_mgr=self.primary_mgr) client_mgr=self.primary_mgr)
#check backend if the port qos profile is updated # check backend if the port qos profile is updated
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_port(policy, network, port) self.verify_backend_port(policy, network, port)
self.assertEqual( self.assertEqual(
@ -585,7 +595,7 @@ class QosPolicyTest(BaseQosTest):
shared=True) shared=True)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
network = self.create_shared_network( network = self.create_shared_network(
@ -609,7 +619,7 @@ class QosPolicyTest(BaseQosTest):
shared=True) shared=True)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
network = self.create_shared_network('test-network') network = self.create_shared_network('test-network')
@ -619,7 +629,7 @@ class QosPolicyTest(BaseQosTest):
client_mgr=self.primary_mgr) client_mgr=self.primary_mgr)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_port, port['id']) self.delete_port, port['id'])
#check backend if the port qos profile is updated # check backend if the port qos profile is updated
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_port(policy, network, port) self.verify_backend_port(policy, network, port)
self.assertRaises( self.assertRaises(
@ -637,7 +647,7 @@ class QosPolicyTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
self.adm_qos_client.create_bandwidth_limit_rule( self.adm_qos_client.create_bandwidth_limit_rule(
@ -661,7 +671,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule( rule = self.create_qos_bandwidth_limit_rule(
@ -670,7 +680,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
# Test 'show rule' # Test 'show rule'
retrieved_rule = qos_client.show_bandwidth_limit_rule( retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id']) rule['id'], policy['id'])
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, rule) self.verify_backend_bandwidth_rule(policy, rule)
self.assertEqual(rule['id'], retrieved_rule['id']) self.assertEqual(rule['id'], retrieved_rule['id'])
@ -698,7 +708,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule( rule = self.create_qos_bandwidth_limit_rule(
@ -708,7 +718,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
# Test 'show rule' # Test 'show rule'
retrieved_rule = qos_client.show_bandwidth_limit_rule( retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id']) rule['id'], policy['id'])
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, rule) self.verify_backend_bandwidth_rule(policy, rule)
self.assertEqual(rule['id'], retrieved_rule['id']) self.assertEqual(rule['id'], retrieved_rule['id'])
@ -738,7 +748,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
egress_rule = self.create_qos_bandwidth_limit_rule( egress_rule = self.create_qos_bandwidth_limit_rule(
@ -747,7 +757,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
ingress_rule = self.create_qos_bandwidth_limit_rule( ingress_rule = self.create_qos_bandwidth_limit_rule(
policy_id=policy['id'], max_kbps=2000, max_burst_kbps=1337, policy_id=policy['id'], max_kbps=2000, max_burst_kbps=1337,
direction='ingress') direction='ingress')
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rules(policy, ingress_rule, 1) self.verify_backend_bandwidth_rules(policy, ingress_rule, 1)
# Test 'show rule' # Test 'show rule'
@ -797,7 +807,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule( rule = self.create_qos_bandwidth_limit_rule(
@ -809,7 +819,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_bandwidth_limit_rule( retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id']) rule['id'], policy['id'])
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, retrieved_rule) self.verify_backend_bandwidth_rule(policy, retrieved_rule)
self.assertEqual(max_kbps, retrieved_rule['max_kbps']) self.assertEqual(max_kbps, retrieved_rule['max_kbps'])
@ -826,7 +836,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule( rule = self.create_qos_bandwidth_limit_rule(
@ -839,7 +849,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_bandwidth_limit_rule( retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id']) rule['id'], policy['id'])
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, retrieved_rule) self.verify_backend_bandwidth_rule(policy, retrieved_rule)
self.assertEqual(max_kbps, retrieved_rule['max_kbps']) self.assertEqual(max_kbps, retrieved_rule['max_kbps'])
@ -855,7 +865,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule( rule = self.create_qos_bandwidth_limit_rule(
@ -868,7 +878,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_bandwidth_limit_rule( retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id']) rule['id'], policy['id'])
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, retrieved_rule) self.verify_backend_bandwidth_rule(policy, retrieved_rule)
self.assertEqual(2000, retrieved_rule['max_kbps']) self.assertEqual(2000, retrieved_rule['max_kbps'])
@ -886,7 +896,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule( rule = self.create_qos_bandwidth_limit_rule(
@ -896,7 +906,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_bandwidth_limit_rule( retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id']) rule['id'], policy['id'])
self.assertEqual(rule['id'], retrieved_rule['id']) self.assertEqual(rule['id'], retrieved_rule['id'])
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, retrieved_rule) self.verify_backend_bandwidth_rule(policy, retrieved_rule)
@ -917,7 +927,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_bandwidth_limit_rule( rule = self.create_qos_bandwidth_limit_rule(
@ -928,7 +938,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_bandwidth_limit_rule( retrieved_rule = qos_client.show_bandwidth_limit_rule(
rule['id'], policy['id']) rule['id'], policy['id'])
self.assertEqual(rule['id'], retrieved_rule['id']) self.assertEqual(rule['id'], retrieved_rule['id'])
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy, retrieved_rule) self.verify_backend_bandwidth_rule(policy, retrieved_rule)
@ -949,7 +959,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
egress_rule = self.create_qos_bandwidth_limit_rule( egress_rule = self.create_qos_bandwidth_limit_rule(
@ -966,11 +976,11 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
ingress_retrieved_rule = qos_client.show_bandwidth_limit_rule( ingress_retrieved_rule = qos_client.show_bandwidth_limit_rule(
ingress_rule['id'], policy['id']) ingress_rule['id'], policy['id'])
self.assertEqual(ingress_rule['id'], ingress_retrieved_rule['id']) self.assertEqual(ingress_rule['id'], ingress_retrieved_rule['id'])
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rules(policy, self.verify_backend_bandwidth_rules(policy,
egress_retrieved_rule, 0) egress_retrieved_rule, 0)
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rules(policy, self.verify_backend_bandwidth_rules(policy,
ingress_retrieved_rule, 1) ingress_retrieved_rule, 1)
@ -1022,12 +1032,12 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy1['id']) self.adm_qos_client.delete_policy, policy1['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy1) self.verify_backend(policy1)
rule1 = self.create_qos_bandwidth_limit_rule( rule1 = self.create_qos_bandwidth_limit_rule(
policy_id=policy1['id'], max_kbps=2000, max_burst_kbps=1337) policy_id=policy1['id'], max_kbps=2000, max_burst_kbps=1337)
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy1, rule1) self.verify_backend_bandwidth_rule(policy1, rule1)
@ -1036,12 +1046,12 @@ class QosBandwidthLimitRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy2['id']) self.adm_qos_client.delete_policy, policy2['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy2) self.verify_backend(policy2)
rule2 = self.create_qos_bandwidth_limit_rule( rule2 = self.create_qos_bandwidth_limit_rule(
policy_id=policy2['id'], max_kbps=5000, max_burst_kbps=2523) policy_id=policy2['id'], max_kbps=5000, max_burst_kbps=2523)
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_bandwidth_rule(policy2, rule2) self.verify_backend_bandwidth_rule(policy2, rule2)
@ -1067,13 +1077,13 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_dscp_marking_rule( rule = self.create_qos_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1) policy['id'], self.VALID_DSCP_MARK1)
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy, rule) self.verify_backend_dscp_rule(policy, rule)
@ -1105,12 +1115,12 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_dscp_marking_rule( rule = self.create_qos_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1) policy['id'], self.VALID_DSCP_MARK1)
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy, rule) self.verify_backend_dscp_rule(policy, rule)
@ -1128,7 +1138,7 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_dscp_marking_rule( rule = self.create_qos_dscp_marking_rule(
@ -1139,7 +1149,7 @@ class QosDscpMarkingRuleTest(BaseQosTest):
retrieved_rule = qos_client.show_dscp_marking_rule( retrieved_rule = qos_client.show_dscp_marking_rule(
rule['id'], policy['id']) rule['id'], policy['id'])
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy, retrieved_rule) self.verify_backend_dscp_rule(policy, retrieved_rule)
self.assertEqual(self.VALID_DSCP_MARK2, retrieved_rule['dscp_mark']) self.assertEqual(self.VALID_DSCP_MARK2, retrieved_rule['dscp_mark'])
@ -1153,13 +1163,13 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
rule = self.create_qos_dscp_marking_rule( rule = self.create_qos_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1) policy['id'], self.VALID_DSCP_MARK1)
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy, rule) self.verify_backend_dscp_rule(policy, rule)
@ -1200,7 +1210,7 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy['id']) self.adm_qos_client.delete_policy, policy['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy) self.verify_backend(policy)
self.assertRaises( self.assertRaises(
@ -1216,12 +1226,12 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy1['id']) self.adm_qos_client.delete_policy, policy1['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy1) self.verify_backend(policy1)
rule1 = self.create_qos_dscp_marking_rule( rule1 = self.create_qos_dscp_marking_rule(
policy1['id'], self.VALID_DSCP_MARK1) policy1['id'], self.VALID_DSCP_MARK1)
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy1, rule1) self.verify_backend_dscp_rule(policy1, rule1)
@ -1230,12 +1240,12 @@ class QosDscpMarkingRuleTest(BaseQosTest):
shared=False) shared=False)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.adm_qos_client.delete_policy, policy2['id']) self.adm_qos_client.delete_policy, policy2['id'])
#check backend if the policy was created # check backend if the policy was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend(policy2) self.verify_backend(policy2)
rule2 = self.create_qos_dscp_marking_rule( rule2 = self.create_qos_dscp_marking_rule(
policy2['id'], self.VALID_DSCP_MARK2) policy2['id'], self.VALID_DSCP_MARK2)
#check backend if the rule was created # check backend if the rule was created
if CONF.network.backend == 'nsxp': if CONF.network.backend == 'nsxp':
self.verify_backend_dscp_rule(policy2, rule2) self.verify_backend_dscp_rule(policy2, rule2)

View File

@ -42,9 +42,6 @@ class LoadBalancersTest(base.BaseAdminTestCase):
super(LoadBalancersTest, cls).resource_setup() super(LoadBalancersTest, cls).resource_setup()
cls.create_lb_kwargs = {'tenant_id': cls.subnet['tenant_id'], cls.create_lb_kwargs = {'tenant_id': cls.subnet['tenant_id'],
'vip_subnet_id': cls.subnet['id']} 'vip_subnet_id': cls.subnet['id']}
cls.load_balancer = \
cls._create_active_load_balancer(**cls.create_lb_kwargs)
cls.load_balancer_id = cls.load_balancer['id']
@decorators.attr(type='smoke') @decorators.attr(type='smoke')
@testtools.skipIf('1641902' in CONF.nsxv.bugs_to_resolve, @testtools.skipIf('1641902' in CONF.nsxv.bugs_to_resolve,

View File

@ -16,6 +16,7 @@ from oslo_log import log as logging
import testtools import testtools
from tempest import config from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators from tempest.lib import decorators
from tempest.lib import exceptions from tempest.lib import exceptions
@ -44,52 +45,89 @@ class LoadBalancersTest(base.BaseTestCase):
super(LoadBalancersTest, cls).resource_setup() super(LoadBalancersTest, cls).resource_setup()
cls.create_lb_kwargs = {'tenant_id': cls.subnet['tenant_id'], cls.create_lb_kwargs = {'tenant_id': cls.subnet['tenant_id'],
'vip_subnet_id': cls.subnet['id']} 'vip_subnet_id': cls.subnet['id']}
cls.load_balancer = \
cls._create_active_load_balancer(**cls.create_lb_kwargs)
cls.load_balancer_id = cls.load_balancer['id']
@decorators.attr(type='smoke') @decorators.attr(type='smoke')
@decorators.idempotent_id('b7ea6c09-e077-4a67-859b-b2cd01e3b46b') @decorators.idempotent_id('b7ea6c09-e077-4a67-859b-b2cd01e3b46b')
def test_list_load_balancers(self): def test_list_load_balancers(self):
"""Test list load balancers with one load balancer""" """Test list load balancers with one load balancer"""
load_balancer = \
self._create_active_load_balancer(**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
load_balancers = self._list_load_balancers() load_balancers = self._list_load_balancers()
self.assertEqual(len(load_balancers), 1) self.assertEqual(len(load_balancers), 1)
self.assertIn(self.load_balancer, load_balancers) self.assertIn(load_balancer, load_balancers)
@decorators.attr(type='smoke') @decorators.attr(type='smoke')
@decorators.idempotent_id('8c2302df-ca94-4950-9826-eb996630a392') @decorators.idempotent_id('8c2302df-ca94-4950-9826-eb996630a392')
def test_list_load_balancers_two(self): def test_list_load_balancers_two(self):
"""Test list load balancers with two load balancers""" """Test list load balancers with two load balancers"""
new_load_balancer = self._create_active_load_balancer( load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs) **self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
rand_number = data_utils.rand_name()
network_name = 'lbaas-network-' + rand_number
router_name = 'lbaas-router-' + rand_number
network = self.create_network(network_name)
subnet = self.create_subnet(network)
router_cfg = dict(
router_name=router_name,
external_network_id=CONF.network.public_network_id)
router = self.create_router(**router_cfg)
self.create_router_interface(router['id'], subnet['id'])
create_lb_kwargs = {'tenant_id': subnet['tenant_id'],
'vip_subnet_id': subnet['id']}
new_load_balancer = self._create_active_load_balancer(
**create_lb_kwargs)
new_load_balancer_id = new_load_balancer['id'] new_load_balancer_id = new_load_balancer['id']
self.addCleanup(self._delete_load_balancer, new_load_balancer_id) self.addCleanup(self._delete_load_balancer, new_load_balancer_id)
load_balancers = self._list_load_balancers() load_balancers = self._list_load_balancers()
self.assertEqual(len(load_balancers), 2) self.assertEqual(len(load_balancers), 2)
self.assertIn(self.load_balancer, load_balancers) self.assertIn(load_balancer, load_balancers)
self.assertIn(new_load_balancer, load_balancers) self.assertIn(new_load_balancer, load_balancers)
self.assertNotEqual(self.load_balancer, new_load_balancer) self.assertNotEqual(load_balancer, new_load_balancer)
@decorators.attr(type='smoke') @decorators.attr(type='smoke')
@decorators.idempotent_id('56345a78-1d53-4c05-9d7b-3e5cf34c22aa') @decorators.idempotent_id('56345a78-1d53-4c05-9d7b-3e5cf34c22aa')
def test_get_load_balancer(self): def test_get_load_balancer(self):
"""Test get load balancer""" """Test get load balancer"""
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
load_balancer = self._show_load_balancer( load_balancer = self._show_load_balancer(
self.load_balancer_id) load_balancer_id)
self.assertEqual(self.load_balancer, load_balancer) self.assertEqual(load_balancer, load_balancer)
@decorators.attr(type='smoke') @decorators.attr(type='smoke')
@decorators.idempotent_id('5bf80330-d908-4025-9467-bca1727525c8') @decorators.idempotent_id('5bf80330-d908-4025-9467-bca1727525c8')
def test_create_load_balancer(self): def test_create_load_balancer(self):
"""Test create load balancer""" """Test create load balancer"""
new_load_balancer = self._create_active_load_balancer( load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs) **self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
rand_number = data_utils.rand_name()
network_name = 'lbaas-network-' + rand_number
router_name = 'lbaas-router-' + rand_number
network = self.create_network(network_name)
subnet = self.create_subnet(network)
router_cfg = dict(
router_name=router_name,
external_network_id=CONF.network.public_network_id)
router = self.create_router(**router_cfg)
self.create_router_interface(router['id'], subnet['id'])
create_lb_kwargs = {'tenant_id': subnet['tenant_id'],
'vip_subnet_id': subnet['id']}
new_load_balancer = self._create_active_load_balancer(
**create_lb_kwargs)
new_load_balancer_id = new_load_balancer['id'] new_load_balancer_id = new_load_balancer['id']
self.addCleanup(self._delete_load_balancer, new_load_balancer_id) self.addCleanup(self._delete_load_balancer, new_load_balancer_id)
load_balancer = self._show_load_balancer( load_balancer_new = self._show_load_balancer(
new_load_balancer_id) new_load_balancer_id)
self.assertEqual(new_load_balancer, load_balancer) self.assertEqual(new_load_balancer, load_balancer_new)
self.assertNotEqual(self.load_balancer, new_load_balancer) self.assertNotEqual(load_balancer, new_load_balancer)
@decorators.attr(type='negative') @decorators.attr(type='negative')
@decorators.idempotent_id('66bf5390-154f-4627-af61-2c1c30325d6f') @decorators.idempotent_id('66bf5390-154f-4627-af61-2c1c30325d6f')
@ -337,20 +375,28 @@ class LoadBalancersTest(base.BaseTestCase):
@decorators.idempotent_id('1d92d98f-550f-4f05-a246-cdf4525459a2') @decorators.idempotent_id('1d92d98f-550f-4f05-a246-cdf4525459a2')
def test_update_load_balancer(self): def test_update_load_balancer(self):
"""Test update load balancer""" """Test update load balancer"""
self._update_load_balancer(self.load_balancer_id, load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
self._update_load_balancer(load_balancer_id,
name='new_name') name='new_name')
load_balancer = self._show_load_balancer( load_balancer = self._show_load_balancer(
self.load_balancer_id) load_balancer_id)
self.assertEqual(load_balancer.get('name'), 'new_name') self.assertEqual(load_balancer.get('name'), 'new_name')
@decorators.attr(type='smoke') @decorators.attr(type='smoke')
@decorators.idempotent_id('474ca200-8dea-4d20-8468-abc0169a445b') @decorators.idempotent_id('474ca200-8dea-4d20-8468-abc0169a445b')
def test_update_load_balancer_empty_name(self): def test_update_load_balancer_empty_name(self):
"""Test update load balancer with empty name""" """Test update load balancer with empty name"""
self._update_load_balancer(self.load_balancer_id, load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
self._update_load_balancer(load_balancer_id,
name="") name="")
load_balancer = self._show_load_balancer( load_balancer = self._show_load_balancer(
self.load_balancer_id) load_balancer_id)
self.assertEqual(load_balancer.get('name'), "") self.assertEqual(load_balancer.get('name'), "")
@decorators.attr(type='negative') @decorators.attr(type='negative')
@ -360,9 +406,13 @@ class LoadBalancersTest(base.BaseTestCase):
Kilo: @decorators.skip_because(bug="1637877") Kilo: @decorators.skip_because(bug="1637877")
""" """
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
self.assertRaises(exceptions.BadRequest, self.assertRaises(exceptions.BadRequest,
self._update_load_balancer, self._update_load_balancer,
load_balancer_id=self.load_balancer_id, load_balancer_id=load_balancer_id,
wait=False, wait=False,
name='a' * 256) name='a' * 256)
@ -370,12 +420,16 @@ class LoadBalancersTest(base.BaseTestCase):
@decorators.idempotent_id('62eef0ba-3859-4c8f-9e6a-8d6918754597') @decorators.idempotent_id('62eef0ba-3859-4c8f-9e6a-8d6918754597')
def test_update_load_balancer_missing_name(self): def test_update_load_balancer_missing_name(self):
"""Test update load balancer with missing name""" """Test update load balancer with missing name"""
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
loadbalancer = self._show_load_balancer( loadbalancer = self._show_load_balancer(
self.load_balancer_id) load_balancer_id)
load_balancer_initial = loadbalancer['name'] load_balancer_initial = loadbalancer['name']
self._update_load_balancer(self.load_balancer_id) self._update_load_balancer(load_balancer_id)
load_balancer = self._show_load_balancer( load_balancer = self._show_load_balancer(
self.load_balancer_id) load_balancer_id)
load_balancer_new = load_balancer['name'] load_balancer_new = load_balancer['name']
self.assertEqual(load_balancer_initial, load_balancer_new) self.assertEqual(load_balancer_initial, load_balancer_new)
@ -386,9 +440,13 @@ class LoadBalancersTest(base.BaseTestCase):
Kilo: @decorators.skip_because(bug="1637877") Kilo: @decorators.skip_because(bug="1637877")
""" """
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
self.assertRaises(exceptions.BadRequest, self.assertRaises(exceptions.BadRequest,
self._update_load_balancer, self._update_load_balancer,
load_balancer_id=self.load_balancer_id, load_balancer_id=load_balancer_id,
wait=False, wait=False,
description='a' * 256) description='a' * 256)
@ -396,22 +454,30 @@ class LoadBalancersTest(base.BaseTestCase):
@decorators.idempotent_id('157ebdbf-4ad2-495d-b880-c1b1a8edc46d') @decorators.idempotent_id('157ebdbf-4ad2-495d-b880-c1b1a8edc46d')
def test_update_load_balancer_empty_description(self): def test_update_load_balancer_empty_description(self):
"""Test update load balancer with empty description""" """Test update load balancer with empty description"""
self._update_load_balancer(self.load_balancer_id, load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
self._update_load_balancer(load_balancer_id,
description="") description="")
load_balancer = self._show_load_balancer( load_balancer = self._show_load_balancer(
self.load_balancer_id) load_balancer_id)
self.assertEqual(load_balancer.get('description'), "") self.assertEqual(load_balancer.get('description'), "")
@decorators.attr(type='smoke') @decorators.attr(type='smoke')
@decorators.idempotent_id('d13fa2f5-e8df-4d53-86a8-68583941200c') @decorators.idempotent_id('d13fa2f5-e8df-4d53-86a8-68583941200c')
def test_update_load_balancer_missing_description(self): def test_update_load_balancer_missing_description(self):
"""Test update load balancer with missing description""" """Test update load balancer with missing description"""
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
loadbalancer = self._show_load_balancer( loadbalancer = self._show_load_balancer(
self.load_balancer_id) load_balancer_id)
load_balancer_initial = loadbalancer['description'] load_balancer_initial = loadbalancer['description']
self._update_load_balancer(self.load_balancer_id) self._update_load_balancer(load_balancer_id)
load_balancer = self._show_load_balancer( load_balancer = self._show_load_balancer(
self.load_balancer_id) load_balancer_id)
load_balancer_new = load_balancer['description'] load_balancer_new = load_balancer['description']
self.assertEqual(load_balancer_initial, load_balancer_new) self.assertEqual(load_balancer_initial, load_balancer_new)
@ -419,9 +485,13 @@ class LoadBalancersTest(base.BaseTestCase):
@decorators.idempotent_id('96e46a1a-62e7-47f1-98c5-9983f89e622f') @decorators.idempotent_id('96e46a1a-62e7-47f1-98c5-9983f89e622f')
def test_update_load_balancer_invalid_admin_state_up_field(self): def test_update_load_balancer_invalid_admin_state_up_field(self):
"""Test update load balancer with an invalid admin_state_up""" """Test update load balancer with an invalid admin_state_up"""
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
self.assertRaises(exceptions.BadRequest, self.assertRaises(exceptions.BadRequest,
self._update_load_balancer, self._update_load_balancer,
load_balancer_id=self.load_balancer_id, load_balancer_id=load_balancer_id,
wait=False, wait=False,
admin_state_up="a&^%$jbc123") admin_state_up="a&^%$jbc123")
@ -429,9 +499,13 @@ class LoadBalancersTest(base.BaseTestCase):
@decorators.idempotent_id('48f1e227-8b15-4389-a050-7ce76f4b4d46') @decorators.idempotent_id('48f1e227-8b15-4389-a050-7ce76f4b4d46')
def test_update_load_balancer_empty_admin_state_up_field(self): def test_update_load_balancer_empty_admin_state_up_field(self):
"""Test update load balancer with an empty admin_state_up""" """Test update load balancer with an empty admin_state_up"""
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
self.assertRaises(exceptions.BadRequest, self.assertRaises(exceptions.BadRequest,
self._update_load_balancer, self._update_load_balancer,
load_balancer_id=self.load_balancer_id, load_balancer_id=load_balancer_id,
wait=False, wait=False,
admin_state_up="") admin_state_up="")
@ -439,19 +513,27 @@ class LoadBalancersTest(base.BaseTestCase):
@decorators.idempotent_id('a9182e53-ddaa-4f41-af54-585d983279ba') @decorators.idempotent_id('a9182e53-ddaa-4f41-af54-585d983279ba')
def test_update_load_balancer_missing_admin_state_up(self): def test_update_load_balancer_missing_admin_state_up(self):
"""Test update load balancer with missing admin state field""" """Test update load balancer with missing admin state field"""
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
loadbalancer = self._show_load_balancer( loadbalancer = self._show_load_balancer(
self.load_balancer_id) load_balancer_id)
load_balancer_initial = loadbalancer['admin_state_up'] load_balancer_initial = loadbalancer['admin_state_up']
self._update_load_balancer(self.load_balancer_id) self._update_load_balancer(load_balancer_id)
self.assertEqual(load_balancer_initial, True) self.assertEqual(load_balancer_initial, True)
@decorators.attr(type='negative') @decorators.attr(type='negative')
@decorators.idempotent_id('bfbe9339-d083-4a88-b6d6-015522809c3a') @decorators.idempotent_id('bfbe9339-d083-4a88-b6d6-015522809c3a')
def test_update_load_balancer_incorrect_attribute(self): def test_update_load_balancer_incorrect_attribute(self):
"""Test update a load balancer with an extra, invalid attribute""" """Test update a load balancer with an extra, invalid attribute"""
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
self.assertRaises(exceptions.BadRequest, self.assertRaises(exceptions.BadRequest,
self._update_load_balancer, self._update_load_balancer,
load_balancer_id=self.load_balancer_id, load_balancer_id=load_balancer_id,
wait=False, wait=False,
name="lb_name", name="lb_name",
description="lb_name_description", description="lb_name_description",
@ -462,8 +544,12 @@ class LoadBalancersTest(base.BaseTestCase):
@decorators.idempotent_id('d2258984-6e9a-41d6-bffa-0543c8b1f2b0') @decorators.idempotent_id('d2258984-6e9a-41d6-bffa-0543c8b1f2b0')
def test_get_load_balancer_status_tree(self): def test_get_load_balancer_status_tree(self):
"""Test get load balancer status tree""" """Test get load balancer status tree"""
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
statuses = self._show_load_balancer_status_tree( statuses = self._show_load_balancer_status_tree(
self.load_balancer_id) load_balancer_id)
load_balancer = statuses['loadbalancer'] load_balancer = statuses['loadbalancer']
self.assertEqual("ONLINE", load_balancer['operating_status']) self.assertEqual("ONLINE", load_balancer['operating_status'])
self.assertEqual("ACTIVE", load_balancer['provisioning_status']) self.assertEqual("ACTIVE", load_balancer['provisioning_status'])
@ -473,8 +559,12 @@ class LoadBalancersTest(base.BaseTestCase):
@decorators.idempotent_id('a23677a9-b770-4894-8be9-cd66590c228b') @decorators.idempotent_id('a23677a9-b770-4894-8be9-cd66590c228b')
def test_get_load_balancer_stats(self): def test_get_load_balancer_stats(self):
"""Test get load balancer stats""" """Test get load balancer stats"""
load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
stats = self._show_load_balancer_stats( stats = self._show_load_balancer_stats(
self.load_balancer_id) load_balancer_id)
self.assertEqual(0, stats['bytes_in']) self.assertEqual(0, stats['bytes_in'])
self.assertEqual(0, stats['bytes_out']) self.assertEqual(0, stats['bytes_out'])
self.assertEqual(0, stats['total_connections']) self.assertEqual(0, stats['total_connections'])
@ -484,13 +574,29 @@ class LoadBalancersTest(base.BaseTestCase):
@decorators.idempotent_id('f289f8df-a867-45cd-bee3-7ff08f5e96e0') @decorators.idempotent_id('f289f8df-a867-45cd-bee3-7ff08f5e96e0')
def test_delete_load_balancer(self): def test_delete_load_balancer(self):
"""Test delete load balancer""" """Test delete load balancer"""
new_load_balancer = self._create_active_load_balancer( load_balancer = self._create_active_load_balancer(
**self.create_lb_kwargs) **self.create_lb_kwargs)
load_balancer_id = load_balancer['id']
self.addCleanup(self._delete_load_balancer, load_balancer_id)
rand_number = data_utils.rand_name()
network_name = 'lbaas-network-' + rand_number
router_name = 'lbaas-router-' + rand_number
network = self.create_network(network_name)
subnet = self.create_subnet(network)
router_cfg = dict(
router_name=router_name,
external_network_id=CONF.network.public_network_id)
router = self.create_router(**router_cfg)
self.create_router_interface(router['id'], subnet['id'])
create_lb_kwargs = {'tenant_id': subnet['tenant_id'],
'vip_subnet_id': subnet['id']}
new_load_balancer = self._create_active_load_balancer(
**create_lb_kwargs)
new_load_balancer_id = new_load_balancer['id'] new_load_balancer_id = new_load_balancer['id']
load_balancer = self._show_load_balancer( load_balancer_new = self._show_load_balancer(
new_load_balancer_id) new_load_balancer_id)
self.assertEqual(new_load_balancer, load_balancer) self.assertEqual(new_load_balancer, load_balancer_new)
self.assertNotEqual(self.load_balancer, new_load_balancer) self.assertNotEqual(load_balancer, new_load_balancer)
self._delete_load_balancer(new_load_balancer_id) self._delete_load_balancer(new_load_balancer_id)
self.assertRaises(exceptions.NotFound, self.assertRaises(exceptions.NotFound,
self._show_load_balancer, self._show_load_balancer,