diff --git a/vmware_nsx_tempest_plugin/services/nsxp_client.py b/vmware_nsx_tempest_plugin/services/nsxp_client.py index d44f461..afc4535 100644 --- a/vmware_nsx_tempest_plugin/services/nsxp_client.py +++ b/vmware_nsx_tempest_plugin/services/nsxp_client.py @@ -423,7 +423,7 @@ class NSXPClient(object): endpoint = "segments/%s/ports/%s/port-qos-profile-binding-maps" % ( segment_id, port_id) response = self.get(endpoint) - return response + return response.json()['results'] def get_neutron_ns_group_id(self): """ diff --git a/vmware_nsx_tempest_plugin/tests/api/test_qos.py b/vmware_nsx_tempest_plugin/tests/api/test_qos.py index 85dd8cd..c27f223 100644 --- a/vmware_nsx_tempest_plugin/tests/api/test_qos.py +++ b/vmware_nsx_tempest_plugin/tests/api/test_qos.py @@ -178,71 +178,81 @@ class BaseQosTest(base.BaseAdminNetworkTest): def verify_backend(self, policy): """Verify backend NSXT for the policy created.""" - #check backend if the policy was created + # check backend if the policy was created msg = 'QoS Policy %s not found' % policy['name'] time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) self.assertIsNotNone(self.nsxp.get_qos_profile( policy['name'], policy['id']), msg) - #Checking the MP backend for qos profiles + # Checking the MP backend for qos profiles self.assertIsNotNone(self.nsx.get_qos_profile( policy['name'], policy['id']), msg) def verify_backend_port(self, policy, network, port): """Verify backend NSXT port is updated with qos policy.""" - #check backend if the policy was created + # check backend if the policy was created msg = 'QoS Policy %s not attached to the port' % policy['name'] time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) segment = self.nsxp.get_logical_switch(network['name'], network['id']) + qos_policy = self.nsxp.get_port_qos_profile_binding_map( + segment['id'], port['id'])[0]['qos_profile_path'].split('/') self.assertEqual(policy['id'], - self.nsxp.get_port_qos_profile_binding_map( - segment['id'], port['id']), msg) + qos_policy[len(qos_policy) - 1], msg) def verify_backend_bandwidth_rule(self, policy, rule): """Verify backend NSXT for the rule created.""" - #check backend if the rule was created + # check backend if the rule was created msg = 'QoS Rule %s not found' % rule['id'] time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) rule_backend = self.nsxp.get_qos_profile(policy['name'], policy['id']) - self.assertEqual((rule['max_kbps'] / 1000) * 2, - rule_backend['shaper_configurations'][0]['peak_bandwidth'], msg) - self.assertEqual((rule['max_kbps'] / 1000), + self.assertEqual( + (rule['max_kbps'] / 1000) * 2, + rule_backend['shaper_configurations'][0]['peak_bandwidth'], + msg) + self.assertEqual( + (rule['max_kbps'] / 1000), rule_backend['shaper_configurations'][0]['average_bandwidth'], msg) if rule['direction'] == 'egress': - self.assertEqual('IngressRateLimiter', + self.assertEqual( + 'IngressRateLimiter', rule_backend['shaper_configurations'][0]['resource_type'], msg) else: - self.assertEqual('EgressRateLimiter', + self.assertEqual( + 'EgressRateLimiter', rule_backend['shaper_configurations'][0]['resource_type'], msg) def verify_backend_bandwidth_rules(self, policy, rule, index): """Verify backend NSXT for the rule created.""" - #check backend if the rule was created + # check backend if the rule was created msg = 'QoS Rule %s not found' % rule['id'] time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) rules_backend = self.nsxp.get_qos_profile(policy['name'], policy['id']) - self.assertEqual((rule['max_kbps'] / 1000) * 2, + self.assertEqual( + (rule['max_kbps'] / 1000) * 2, rules_backend['shaper_configurations'][index]['peak_bandwidth'], msg) - self.assertEqual((rule['max_kbps'] / 1000), + self.assertEqual( + (rule['max_kbps'] / 1000), rules_backend['shaper_configurations'][index]['average_bandwidth'], msg) if rule['direction'] == 'egress': - self.assertEqual('IngressRateLimiter', + self.assertEqual( + 'IngressRateLimiter', rules_backend['shaper_configurations'][index]['resource_type'], msg) else: - self.assertEqual('EgressRateLimiter', + self.assertEqual( + 'EgressRateLimiter', rules_backend['shaper_configurations'][index]['resource_type'], msg) def verify_backend_dscp_rule(self, policy, rule): """Verify backend NSXT for the rule created.""" - #check backend if the rule was created + # check backend if the rule was created msg = 'QoS Rule %s not found' % rule['id'] time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL) rule_backend = self.nsxp.get_qos_profile(policy['name'], policy['id']) self.assertEqual(rule['dscp_mark'], rule_backend['dscp']['priority'], - msg) + msg) class QosPolicyTest(BaseQosTest): @@ -266,7 +276,7 @@ class QosPolicyTest(BaseQosTest): self.assertEqual('test policy desc1', retrieved_policy['description']) self.assertFalse(retrieved_policy['shared']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) # Test 'list policies' @@ -283,14 +293,14 @@ class QosPolicyTest(BaseQosTest): name=name1, description='test policy', shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy_name1['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy_name1) policy_name2 = self.create_qos_policy( name=name2, description='test policy', shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy_name2['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy_name2) policies = self.adm_qos_client.list_policies(name=name1) @@ -310,7 +320,7 @@ class QosPolicyTest(BaseQosTest): self.adm_qos_client.update_policy(policy['id'], description='test policy desc2', shared=True) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) @@ -328,7 +338,7 @@ class QosPolicyTest(BaseQosTest): self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) retrieved_policy = self.adm_qos_client.show_policy(policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) self.assertEqual('test-policy', retrieved_policy['name']) @@ -386,7 +396,7 @@ class QosPolicyTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) network = self.create_shared_network('test-network', @@ -406,7 +416,7 @@ class QosPolicyTest(BaseQosTest): shared=True) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) network = self.create_network('test-network', @@ -452,7 +462,7 @@ class QosPolicyTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) network = self.create_shared_network('test-network') @@ -486,7 +496,7 @@ class QosPolicyTest(BaseQosTest): shared=True) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) network = self.create_shared_network('test-network') @@ -494,7 +504,7 @@ class QosPolicyTest(BaseQosTest): self.delete_network, network['id']) port = self.create_port(network, qos_policy_id=policy['id'], client_mgr=self.primary_mgr) - #check backend if the port qos profile is updated + # check backend if the port qos profile is updated if CONF.network.backend == 'nsxp': self.verify_backend_port(policy, network, port) self.addCleanup(test_utils.call_and_ignore_notfound_exc, @@ -530,7 +540,7 @@ class QosPolicyTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) network = self.create_shared_network('test-network') @@ -551,7 +561,7 @@ class QosPolicyTest(BaseQosTest): shared=True) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) network = self.create_shared_network('test-network') @@ -568,7 +578,7 @@ class QosPolicyTest(BaseQosTest): client_mgr=self.primary_mgr) retrieved_port = self.show_port(port['id'], client_mgr=self.primary_mgr) - #check backend if the port qos profile is updated + # check backend if the port qos profile is updated if CONF.network.backend == 'nsxp': self.verify_backend_port(policy, network, port) self.assertEqual( @@ -585,7 +595,7 @@ class QosPolicyTest(BaseQosTest): shared=True) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) network = self.create_shared_network( @@ -609,7 +619,7 @@ class QosPolicyTest(BaseQosTest): shared=True) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) network = self.create_shared_network('test-network') @@ -619,7 +629,7 @@ class QosPolicyTest(BaseQosTest): client_mgr=self.primary_mgr) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.delete_port, port['id']) - #check backend if the port qos profile is updated + # check backend if the port qos profile is updated if CONF.network.backend == 'nsxp': self.verify_backend_port(policy, network, port) self.assertRaises( @@ -637,7 +647,7 @@ class QosPolicyTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) self.adm_qos_client.create_bandwidth_limit_rule( @@ -661,7 +671,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_bandwidth_limit_rule( @@ -670,7 +680,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): # Test 'show rule' retrieved_rule = qos_client.show_bandwidth_limit_rule( rule['id'], policy['id']) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rule(policy, rule) self.assertEqual(rule['id'], retrieved_rule['id']) @@ -698,7 +708,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_bandwidth_limit_rule( @@ -708,7 +718,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): # Test 'show rule' retrieved_rule = qos_client.show_bandwidth_limit_rule( rule['id'], policy['id']) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rule(policy, rule) self.assertEqual(rule['id'], retrieved_rule['id']) @@ -738,7 +748,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) egress_rule = self.create_qos_bandwidth_limit_rule( @@ -747,7 +757,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): ingress_rule = self.create_qos_bandwidth_limit_rule( policy_id=policy['id'], max_kbps=2000, max_burst_kbps=1337, direction='ingress') - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rules(policy, ingress_rule, 1) # Test 'show rule' @@ -797,7 +807,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_bandwidth_limit_rule( @@ -809,7 +819,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): retrieved_rule = qos_client.show_bandwidth_limit_rule( rule['id'], policy['id']) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rule(policy, retrieved_rule) self.assertEqual(max_kbps, retrieved_rule['max_kbps']) @@ -826,7 +836,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_bandwidth_limit_rule( @@ -839,7 +849,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): retrieved_rule = qos_client.show_bandwidth_limit_rule( rule['id'], policy['id']) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rule(policy, retrieved_rule) self.assertEqual(max_kbps, retrieved_rule['max_kbps']) @@ -855,7 +865,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_bandwidth_limit_rule( @@ -868,7 +878,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): retrieved_rule = qos_client.show_bandwidth_limit_rule( rule['id'], policy['id']) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rule(policy, retrieved_rule) self.assertEqual(2000, retrieved_rule['max_kbps']) @@ -886,7 +896,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_bandwidth_limit_rule( @@ -896,7 +906,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): retrieved_rule = qos_client.show_bandwidth_limit_rule( rule['id'], policy['id']) self.assertEqual(rule['id'], retrieved_rule['id']) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rule(policy, retrieved_rule) @@ -917,7 +927,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_bandwidth_limit_rule( @@ -928,7 +938,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): retrieved_rule = qos_client.show_bandwidth_limit_rule( rule['id'], policy['id']) self.assertEqual(rule['id'], retrieved_rule['id']) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rule(policy, retrieved_rule) @@ -949,7 +959,7 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) egress_rule = self.create_qos_bandwidth_limit_rule( @@ -966,11 +976,11 @@ class QosBandwidthLimitRuleTest(BaseQosTest): ingress_retrieved_rule = qos_client.show_bandwidth_limit_rule( ingress_rule['id'], policy['id']) self.assertEqual(ingress_rule['id'], ingress_retrieved_rule['id']) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rules(policy, egress_retrieved_rule, 0) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rules(policy, ingress_retrieved_rule, 1) @@ -1022,12 +1032,12 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy1['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy1) rule1 = self.create_qos_bandwidth_limit_rule( policy_id=policy1['id'], max_kbps=2000, max_burst_kbps=1337) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rule(policy1, rule1) @@ -1036,12 +1046,12 @@ class QosBandwidthLimitRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy2['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy2) rule2 = self.create_qos_bandwidth_limit_rule( policy_id=policy2['id'], max_kbps=5000, max_burst_kbps=2523) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_bandwidth_rule(policy2, rule2) @@ -1067,13 +1077,13 @@ class QosDscpMarkingRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_dscp_marking_rule( policy['id'], self.VALID_DSCP_MARK1) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_dscp_rule(policy, rule) @@ -1105,12 +1115,12 @@ class QosDscpMarkingRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_dscp_marking_rule( policy['id'], self.VALID_DSCP_MARK1) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_dscp_rule(policy, rule) @@ -1128,7 +1138,7 @@ class QosDscpMarkingRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_dscp_marking_rule( @@ -1139,7 +1149,7 @@ class QosDscpMarkingRuleTest(BaseQosTest): retrieved_rule = qos_client.show_dscp_marking_rule( rule['id'], policy['id']) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_dscp_rule(policy, retrieved_rule) self.assertEqual(self.VALID_DSCP_MARK2, retrieved_rule['dscp_mark']) @@ -1153,13 +1163,13 @@ class QosDscpMarkingRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) rule = self.create_qos_dscp_marking_rule( policy['id'], self.VALID_DSCP_MARK1) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_dscp_rule(policy, rule) @@ -1200,7 +1210,7 @@ class QosDscpMarkingRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy) self.assertRaises( @@ -1216,12 +1226,12 @@ class QosDscpMarkingRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy1['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy1) rule1 = self.create_qos_dscp_marking_rule( policy1['id'], self.VALID_DSCP_MARK1) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_dscp_rule(policy1, rule1) @@ -1230,12 +1240,12 @@ class QosDscpMarkingRuleTest(BaseQosTest): shared=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.adm_qos_client.delete_policy, policy2['id']) - #check backend if the policy was created + # check backend if the policy was created if CONF.network.backend == 'nsxp': self.verify_backend(policy2) rule2 = self.create_qos_dscp_marking_rule( policy2['id'], self.VALID_DSCP_MARK2) - #check backend if the rule was created + # check backend if the rule was created if CONF.network.backend == 'nsxp': self.verify_backend_dscp_rule(policy2, rule2) diff --git a/vmware_nsx_tempest_plugin/tests/nsxv/api/lbaas/test_load_balancers_admin.py b/vmware_nsx_tempest_plugin/tests/nsxv/api/lbaas/test_load_balancers_admin.py index ece7f80..c314fb4 100644 --- a/vmware_nsx_tempest_plugin/tests/nsxv/api/lbaas/test_load_balancers_admin.py +++ b/vmware_nsx_tempest_plugin/tests/nsxv/api/lbaas/test_load_balancers_admin.py @@ -42,9 +42,6 @@ class LoadBalancersTest(base.BaseAdminTestCase): super(LoadBalancersTest, cls).resource_setup() cls.create_lb_kwargs = {'tenant_id': cls.subnet['tenant_id'], 'vip_subnet_id': cls.subnet['id']} - cls.load_balancer = \ - cls._create_active_load_balancer(**cls.create_lb_kwargs) - cls.load_balancer_id = cls.load_balancer['id'] @decorators.attr(type='smoke') @testtools.skipIf('1641902' in CONF.nsxv.bugs_to_resolve, diff --git a/vmware_nsx_tempest_plugin/tests/nsxv/api/lbaas/test_load_balancers_non_admin.py b/vmware_nsx_tempest_plugin/tests/nsxv/api/lbaas/test_load_balancers_non_admin.py index 5f2c99e..ee06331 100644 --- a/vmware_nsx_tempest_plugin/tests/nsxv/api/lbaas/test_load_balancers_non_admin.py +++ b/vmware_nsx_tempest_plugin/tests/nsxv/api/lbaas/test_load_balancers_non_admin.py @@ -16,6 +16,7 @@ from oslo_log import log as logging import testtools from tempest import config +from tempest.lib.common.utils import data_utils from tempest.lib import decorators from tempest.lib import exceptions @@ -44,52 +45,89 @@ class LoadBalancersTest(base.BaseTestCase): super(LoadBalancersTest, cls).resource_setup() cls.create_lb_kwargs = {'tenant_id': cls.subnet['tenant_id'], 'vip_subnet_id': cls.subnet['id']} - cls.load_balancer = \ - cls._create_active_load_balancer(**cls.create_lb_kwargs) - cls.load_balancer_id = cls.load_balancer['id'] @decorators.attr(type='smoke') @decorators.idempotent_id('b7ea6c09-e077-4a67-859b-b2cd01e3b46b') def test_list_load_balancers(self): """Test list load balancers with one load balancer""" + load_balancer = \ + self._create_active_load_balancer(**self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) load_balancers = self._list_load_balancers() self.assertEqual(len(load_balancers), 1) - self.assertIn(self.load_balancer, load_balancers) + self.assertIn(load_balancer, load_balancers) @decorators.attr(type='smoke') @decorators.idempotent_id('8c2302df-ca94-4950-9826-eb996630a392') def test_list_load_balancers_two(self): """Test list load balancers with two load balancers""" - new_load_balancer = self._create_active_load_balancer( + load_balancer = self._create_active_load_balancer( **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) + rand_number = data_utils.rand_name() + network_name = 'lbaas-network-' + rand_number + router_name = 'lbaas-router-' + rand_number + network = self.create_network(network_name) + subnet = self.create_subnet(network) + router_cfg = dict( + router_name=router_name, + external_network_id=CONF.network.public_network_id) + router = self.create_router(**router_cfg) + self.create_router_interface(router['id'], subnet['id']) + create_lb_kwargs = {'tenant_id': subnet['tenant_id'], + 'vip_subnet_id': subnet['id']} + new_load_balancer = self._create_active_load_balancer( + **create_lb_kwargs) new_load_balancer_id = new_load_balancer['id'] self.addCleanup(self._delete_load_balancer, new_load_balancer_id) load_balancers = self._list_load_balancers() self.assertEqual(len(load_balancers), 2) - self.assertIn(self.load_balancer, load_balancers) + self.assertIn(load_balancer, load_balancers) self.assertIn(new_load_balancer, load_balancers) - self.assertNotEqual(self.load_balancer, new_load_balancer) + self.assertNotEqual(load_balancer, new_load_balancer) @decorators.attr(type='smoke') @decorators.idempotent_id('56345a78-1d53-4c05-9d7b-3e5cf34c22aa') def test_get_load_balancer(self): """Test get load balancer""" + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) load_balancer = self._show_load_balancer( - self.load_balancer_id) - self.assertEqual(self.load_balancer, load_balancer) + load_balancer_id) + self.assertEqual(load_balancer, load_balancer) @decorators.attr(type='smoke') @decorators.idempotent_id('5bf80330-d908-4025-9467-bca1727525c8') def test_create_load_balancer(self): """Test create load balancer""" - new_load_balancer = self._create_active_load_balancer( + load_balancer = self._create_active_load_balancer( **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) + rand_number = data_utils.rand_name() + network_name = 'lbaas-network-' + rand_number + router_name = 'lbaas-router-' + rand_number + network = self.create_network(network_name) + subnet = self.create_subnet(network) + router_cfg = dict( + router_name=router_name, + external_network_id=CONF.network.public_network_id) + router = self.create_router(**router_cfg) + self.create_router_interface(router['id'], subnet['id']) + create_lb_kwargs = {'tenant_id': subnet['tenant_id'], + 'vip_subnet_id': subnet['id']} + new_load_balancer = self._create_active_load_balancer( + **create_lb_kwargs) new_load_balancer_id = new_load_balancer['id'] self.addCleanup(self._delete_load_balancer, new_load_balancer_id) - load_balancer = self._show_load_balancer( + load_balancer_new = self._show_load_balancer( new_load_balancer_id) - self.assertEqual(new_load_balancer, load_balancer) - self.assertNotEqual(self.load_balancer, new_load_balancer) + self.assertEqual(new_load_balancer, load_balancer_new) + self.assertNotEqual(load_balancer, new_load_balancer) @decorators.attr(type='negative') @decorators.idempotent_id('66bf5390-154f-4627-af61-2c1c30325d6f') @@ -337,20 +375,28 @@ class LoadBalancersTest(base.BaseTestCase): @decorators.idempotent_id('1d92d98f-550f-4f05-a246-cdf4525459a2') def test_update_load_balancer(self): """Test update load balancer""" - self._update_load_balancer(self.load_balancer_id, + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) + self._update_load_balancer(load_balancer_id, name='new_name') load_balancer = self._show_load_balancer( - self.load_balancer_id) + load_balancer_id) self.assertEqual(load_balancer.get('name'), 'new_name') @decorators.attr(type='smoke') @decorators.idempotent_id('474ca200-8dea-4d20-8468-abc0169a445b') def test_update_load_balancer_empty_name(self): """Test update load balancer with empty name""" - self._update_load_balancer(self.load_balancer_id, + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) + self._update_load_balancer(load_balancer_id, name="") load_balancer = self._show_load_balancer( - self.load_balancer_id) + load_balancer_id) self.assertEqual(load_balancer.get('name'), "") @decorators.attr(type='negative') @@ -360,9 +406,13 @@ class LoadBalancersTest(base.BaseTestCase): Kilo: @decorators.skip_because(bug="1637877") """ + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) self.assertRaises(exceptions.BadRequest, self._update_load_balancer, - load_balancer_id=self.load_balancer_id, + load_balancer_id=load_balancer_id, wait=False, name='a' * 256) @@ -370,12 +420,16 @@ class LoadBalancersTest(base.BaseTestCase): @decorators.idempotent_id('62eef0ba-3859-4c8f-9e6a-8d6918754597') def test_update_load_balancer_missing_name(self): """Test update load balancer with missing name""" + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) loadbalancer = self._show_load_balancer( - self.load_balancer_id) + load_balancer_id) load_balancer_initial = loadbalancer['name'] - self._update_load_balancer(self.load_balancer_id) + self._update_load_balancer(load_balancer_id) load_balancer = self._show_load_balancer( - self.load_balancer_id) + load_balancer_id) load_balancer_new = load_balancer['name'] self.assertEqual(load_balancer_initial, load_balancer_new) @@ -386,9 +440,13 @@ class LoadBalancersTest(base.BaseTestCase): Kilo: @decorators.skip_because(bug="1637877") """ + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) self.assertRaises(exceptions.BadRequest, self._update_load_balancer, - load_balancer_id=self.load_balancer_id, + load_balancer_id=load_balancer_id, wait=False, description='a' * 256) @@ -396,22 +454,30 @@ class LoadBalancersTest(base.BaseTestCase): @decorators.idempotent_id('157ebdbf-4ad2-495d-b880-c1b1a8edc46d') def test_update_load_balancer_empty_description(self): """Test update load balancer with empty description""" - self._update_load_balancer(self.load_balancer_id, + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) + self._update_load_balancer(load_balancer_id, description="") load_balancer = self._show_load_balancer( - self.load_balancer_id) + load_balancer_id) self.assertEqual(load_balancer.get('description'), "") @decorators.attr(type='smoke') @decorators.idempotent_id('d13fa2f5-e8df-4d53-86a8-68583941200c') def test_update_load_balancer_missing_description(self): """Test update load balancer with missing description""" + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) loadbalancer = self._show_load_balancer( - self.load_balancer_id) + load_balancer_id) load_balancer_initial = loadbalancer['description'] - self._update_load_balancer(self.load_balancer_id) + self._update_load_balancer(load_balancer_id) load_balancer = self._show_load_balancer( - self.load_balancer_id) + load_balancer_id) load_balancer_new = load_balancer['description'] self.assertEqual(load_balancer_initial, load_balancer_new) @@ -419,9 +485,13 @@ class LoadBalancersTest(base.BaseTestCase): @decorators.idempotent_id('96e46a1a-62e7-47f1-98c5-9983f89e622f') def test_update_load_balancer_invalid_admin_state_up_field(self): """Test update load balancer with an invalid admin_state_up""" + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) self.assertRaises(exceptions.BadRequest, self._update_load_balancer, - load_balancer_id=self.load_balancer_id, + load_balancer_id=load_balancer_id, wait=False, admin_state_up="a&^%$jbc123") @@ -429,9 +499,13 @@ class LoadBalancersTest(base.BaseTestCase): @decorators.idempotent_id('48f1e227-8b15-4389-a050-7ce76f4b4d46') def test_update_load_balancer_empty_admin_state_up_field(self): """Test update load balancer with an empty admin_state_up""" + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) self.assertRaises(exceptions.BadRequest, self._update_load_balancer, - load_balancer_id=self.load_balancer_id, + load_balancer_id=load_balancer_id, wait=False, admin_state_up="") @@ -439,19 +513,27 @@ class LoadBalancersTest(base.BaseTestCase): @decorators.idempotent_id('a9182e53-ddaa-4f41-af54-585d983279ba') def test_update_load_balancer_missing_admin_state_up(self): """Test update load balancer with missing admin state field""" + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) loadbalancer = self._show_load_balancer( - self.load_balancer_id) + load_balancer_id) load_balancer_initial = loadbalancer['admin_state_up'] - self._update_load_balancer(self.load_balancer_id) + self._update_load_balancer(load_balancer_id) self.assertEqual(load_balancer_initial, True) @decorators.attr(type='negative') @decorators.idempotent_id('bfbe9339-d083-4a88-b6d6-015522809c3a') def test_update_load_balancer_incorrect_attribute(self): """Test update a load balancer with an extra, invalid attribute""" + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) self.assertRaises(exceptions.BadRequest, self._update_load_balancer, - load_balancer_id=self.load_balancer_id, + load_balancer_id=load_balancer_id, wait=False, name="lb_name", description="lb_name_description", @@ -462,8 +544,12 @@ class LoadBalancersTest(base.BaseTestCase): @decorators.idempotent_id('d2258984-6e9a-41d6-bffa-0543c8b1f2b0') def test_get_load_balancer_status_tree(self): """Test get load balancer status tree""" + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) statuses = self._show_load_balancer_status_tree( - self.load_balancer_id) + load_balancer_id) load_balancer = statuses['loadbalancer'] self.assertEqual("ONLINE", load_balancer['operating_status']) self.assertEqual("ACTIVE", load_balancer['provisioning_status']) @@ -473,8 +559,12 @@ class LoadBalancersTest(base.BaseTestCase): @decorators.idempotent_id('a23677a9-b770-4894-8be9-cd66590c228b') def test_get_load_balancer_stats(self): """Test get load balancer stats""" + load_balancer = self._create_active_load_balancer( + **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) stats = self._show_load_balancer_stats( - self.load_balancer_id) + load_balancer_id) self.assertEqual(0, stats['bytes_in']) self.assertEqual(0, stats['bytes_out']) self.assertEqual(0, stats['total_connections']) @@ -484,13 +574,29 @@ class LoadBalancersTest(base.BaseTestCase): @decorators.idempotent_id('f289f8df-a867-45cd-bee3-7ff08f5e96e0') def test_delete_load_balancer(self): """Test delete load balancer""" - new_load_balancer = self._create_active_load_balancer( + load_balancer = self._create_active_load_balancer( **self.create_lb_kwargs) + load_balancer_id = load_balancer['id'] + self.addCleanup(self._delete_load_balancer, load_balancer_id) + rand_number = data_utils.rand_name() + network_name = 'lbaas-network-' + rand_number + router_name = 'lbaas-router-' + rand_number + network = self.create_network(network_name) + subnet = self.create_subnet(network) + router_cfg = dict( + router_name=router_name, + external_network_id=CONF.network.public_network_id) + router = self.create_router(**router_cfg) + self.create_router_interface(router['id'], subnet['id']) + create_lb_kwargs = {'tenant_id': subnet['tenant_id'], + 'vip_subnet_id': subnet['id']} + new_load_balancer = self._create_active_load_balancer( + **create_lb_kwargs) new_load_balancer_id = new_load_balancer['id'] - load_balancer = self._show_load_balancer( + load_balancer_new = self._show_load_balancer( new_load_balancer_id) - self.assertEqual(new_load_balancer, load_balancer) - self.assertNotEqual(self.load_balancer, new_load_balancer) + self.assertEqual(new_load_balancer, load_balancer_new) + self.assertNotEqual(load_balancer, new_load_balancer) self._delete_load_balancer(new_load_balancer_id) self.assertRaises(exceptions.NotFound, self._show_load_balancer,