Merge "Add additional loadbalancer scenarios for octavia"

This commit is contained in:
Zuul 2018-12-18 05:04:46 +00:00 committed by Gerrit Code Review
commit 798caba8e1
18 changed files with 612 additions and 80 deletions

View File

@ -25,6 +25,11 @@ Added
* Added neutron trunk scenarios
* Added barbican scenarios.
* Added octavia scenarios.
* [scenario plugin] Octavia.create_and_list_loadbalancers
* [scenario plugin] Octavia.create_and_delete_loadbalancers
* [scenario plugin] Octavia.create_and_update_loadbalancers
* [scenario plugin] Octavia.create_and_stats_loadbalancers
* [scenario plugin] Octavia.create_and_show_loadbalancers
* Support for osprofiler config in Devstack plugin.
* Added property 'floating_ip_enabled' in magnum cluster_templates context.

View File

@ -21,7 +21,7 @@
failure_rate:
max: 0
-
title: Test base Octavia actions
title: Octavia.create_and_list_loadbalancers
workloads:
-
scenario:
@ -38,3 +38,75 @@
sla:
failure_rate:
max: 0
-
title: Octavia.create_and_delete_loadbalancers
workloads:
-
scenario:
Octavia.create_and_delete_loadbalancers: {}
runner:
constant:
times: 2
concurrency: 2
contexts:
users:
tenants: 2
roles: ["load-balancer_member"]
network: {}
sla:
failure_rate:
max: 0
-
title: Octavia.create_and_update_loadbalanacers
workloads:
-
scenario:
Octavia.create_and_update_loadbalancers: {}
runner:
constant:
times: 2
concurrency: 2
contexts:
users:
tenants: 2
roles: ["load-balancer_member"]
network: {}
sla:
failure_rate:
max: 0
-
title: Octavia.create_and_stats_loadbalanacers
workloads:
-
scenario:
Octavia.create_and_stats_loadbalancers: {}
runner:
constant:
times: 2
concurrency: 2
contexts:
users:
tenants: 2
roles: ["load-balancer_member"]
network: {}
sla:
failure_rate:
max: 0
-
title: Octavia.create_and_show_loadbalanacers
workloads:
-
scenario:
Octavia.create_and_show_loadbalancers: {}
runner:
constant:
times: 2
concurrency: 2
contexts:
users:
tenants: 2
roles: ["load-balancer_member"]
network: {}
sla:
failure_rate:
max: 0

View File

@ -338,12 +338,13 @@ class OctaviaLoadbalancer(base.ResourceManager):
return self.raw_resource["name"]
def delete(self):
return self._client().load_balancer_delete(self.id())
return self._client().load_balancer_delete(
self.id(), cascade=True)
def is_deleted(self):
try:
self._client().load_balancer_show(self.id())
except Exception as e:
except Exception:
return True
return False

View File

@ -40,20 +40,212 @@ class OctaviaBase(scenario.OpenStackScenario):
platform="openstack")
class CreateAndListLoadbalancers(OctaviaBase):
def run(self):
def run(self, description=None, admin_state=True,
listeners=None, flavor_id=None, provider=None,
vip_qos_policy_id=None):
"""Create a loadbalancer per each subnet and then list loadbalancers.
Measure the "Octavia loadbalancer list" command performance.
The scenario creates a loadbalancer for every subnet and then lists
loadbalancers.
:param description: Human-readable description of the loadbalancer
:param admin_state: The administrative state of the loadbalancer,
which is up(true) or down(false)
:param listeners: The associated listener id, if any
:param flavor_id: The ID of the flavor
:param provider: Provider name for the loadbalancer
:param vip_qos_policy_id: The ID of the QoS policy
"""
subnets = []
loadbalancers = []
networks = self.context.get("tenant", {}).get("networks", [])
for network in networks:
for subnet_id in network.get("subnets", []):
lb = self.octavia.load_balancer_create(subnet_id)
loadbalancers.append(lb)
subnets.extend(network.get("subnets", []))
for subnet_id in subnets:
lb = self.octavia.load_balancer_create(
subnet_id=subnet_id,
description=description,
admin_state=admin_state,
listeners=listeners,
flavor_id=flavor_id,
provider=provider,
vip_qos_policy_id=vip_qos_policy_id)
loadbalancers.append(lb)
for loadbalancer in loadbalancers:
self.octavia.wait_for_loadbalancer_prov_status(loadbalancer)
self.octavia.load_balancer_list()
@validation.add("required_services", services=[consts.Service.OCTAVIA])
@validation.add("required_platform", platform="openstack", users=True)
@validation.add("required_contexts", contexts=["network"])
@scenario.configure(context={"cleanup@openstack": ["octavia"]},
name="Octavia.create_and_delete_loadbalancers",
platform="openstack")
class CreateAndDeleteLoadbalancers(OctaviaBase):
def run(self, description=None, admin_state=True,
listeners=None, flavor_id=None, provider=None,
vip_qos_policy_id=None):
"""Create a loadbalancer per each subnet and then delete loadbalancer
:param description: Human-readable description of the loadbalancer
:param admin_state: The administrative state of the loadbalancer,
which is up(true) or down(false)
:param listeners: The associated listener id, if any
:param flavor_id: The ID of the flavor
:param provider: Provider name for the loadbalancer
:param vip_qos_policy_id: The ID of the QoS policy
"""
subnets = []
loadbalancers = []
networks = self.context.get("tenant", {}).get("networks", [])
for network in networks:
subnets.extend(network.get("subnets", []))
for subnet_id in subnets:
lb = self.octavia.load_balancer_create(
subnet_id=subnet_id,
description=description,
admin_state=admin_state,
listeners=listeners,
flavor_id=flavor_id,
provider=provider,
vip_qos_policy_id=vip_qos_policy_id)
loadbalancers.append(lb)
for loadbalancer in loadbalancers:
self.octavia.wait_for_loadbalancer_prov_status(loadbalancer)
self.octavia.load_balancer_delete(
loadbalancer["loadbalancer"]["id"])
@validation.add("required_services", services=[consts.Service.OCTAVIA])
@validation.add("required_platform", platform="openstack", users=True)
@validation.add("required_contexts", contexts=["network"])
@scenario.configure(context={"cleanup@openstack": ["octavia"]},
name="Octavia.create_and_update_loadbalancers",
platform="openstack")
class CreateAndUpdateLoadBalancers(OctaviaBase):
def run(self, description=None, admin_state=True,
listeners=None, flavor_id=None, provider=None,
vip_qos_policy_id=None):
"""Create a loadbalancer per each subnet and then update
:param description: Human-readable description of the loadbalancer
:param admin_state: The administrative state of the loadbalancer,
which is up(true) or down(false)
:param listeners: The associated listener id, if any
:param flavor_id: The ID of the flavor
:param provider: Provider name for the loadbalancer
:param vip_qos_policy_id: The ID of the QoS policy
"""
subnets = []
loadbalancers = []
networks = self.context.get("tenant", {}).get("networks", [])
for network in networks:
subnets.extend(network.get("subnets", []))
for subnet_id in subnets:
lb = self.octavia.load_balancer_create(
subnet_id=subnet_id,
description=description,
admin_state=admin_state,
listeners=listeners,
flavor_id=flavor_id,
provider=provider,
vip_qos_policy_id=vip_qos_policy_id)
loadbalancers.append(lb)
update_loadbalancer = {
"name": self.generate_random_name()
}
for loadbalancer in loadbalancers:
self.octavia.wait_for_loadbalancer_prov_status(loadbalancer)
self.octavia.load_balancer_set(
lb_id=loadbalancer["loadbalancer"]["id"],
lb_update_args=update_loadbalancer)
@validation.add("required_services", services=[consts.Service.OCTAVIA])
@validation.add("required_platform", platform="openstack", users=True)
@validation.add("required_contexts", contexts=["network"])
@scenario.configure(context={"cleanup@openstack": ["octavia"]},
name="Octavia.create_and_stats_loadbalancers",
platform="openstack")
class CreateAndShowStatsLoadBalancers(OctaviaBase):
def run(self, description=None, admin_state=True,
listeners=None, flavor_id=None, provider=None,
vip_qos_policy_id=None):
"""Create a loadbalancer per each subnet and stats
:param description: Human-readable description of the loadbalancer
:param admin_state: The administrative state of the loadbalancer,
which is up(true) or down(false)
:param listeners: The associated listener id, if any
:param flavor_id: The ID of the flavor
:param provider: Provider name for the loadbalancer
:param vip_qos_policy_id: The ID of the QoS policy
"""
subnets = []
loadbalancers = []
networks = self.context.get("tenant", {}).get("networks", [])
for network in networks:
subnets.extend(network.get("subnets", []))
for subnet_id in subnets:
lb = self.octavia.load_balancer_create(
subnet_id=subnet_id,
description=description,
admin_state=admin_state,
listeners=listeners,
flavor_id=flavor_id,
provider=provider,
vip_qos_policy_id=vip_qos_policy_id)
loadbalancers.append(lb)
for loadbalancer in loadbalancers:
self.octavia.wait_for_loadbalancer_prov_status(loadbalancer)
self.octavia.load_balancer_stats_show(
loadbalancer["loadbalancer"])
@validation.add("required_services", services=[consts.Service.OCTAVIA])
@validation.add("required_platform", platform="openstack", users=True)
@validation.add("required_contexts", contexts=["network"])
@scenario.configure(context={"cleanup@openstack": ["octavia"]},
name="Octavia.create_and_show_loadbalancers",
platform="openstack")
class CreateAndShowLoadBalancers(OctaviaBase):
def run(self, description=None, admin_state=True,
listeners=None, flavor_id=None, provider=None,
vip_qos_policy_id=None):
"""Create a loadbalancer per each subnet and then compare
:param description: Human-readable description of the loadbalancer
:param admin_state: The administrative state of the loadbalancer,
which is up(true) or down(false)
:param listeners: The associated listener id, if any
:param flavor_id: The ID of the flavor
:param provider: Provider name for the loadbalancer
:param vip_qos_policy_id: The ID of the QoS policy
"""
subnets = []
loadbalancers = []
networks = self.context.get("tenant", {}).get("networks", [])
for network in networks:
subnets.extend(network.get("subnets", []))
for subnet_id in subnets:
lb = self.octavia.load_balancer_create(
subnet_id=subnet_id,
description=description,
admin_state=admin_state,
listeners=listeners,
flavor_id=flavor_id,
provider=provider,
vip_qos_policy_id=vip_qos_policy_id)
loadbalancers.append(lb)
for loadbalancer in loadbalancers:
self.octavia.wait_for_loadbalancer_prov_status(loadbalancer)
self.octavia.load_balancer_show(
loadbalancer["loadbalancer"])

View File

@ -36,83 +36,85 @@ class Octavia(service.Service):
return self._clients.octavia().load_balancer_list()
@atomic.action_timer("octavia.load_balancer_show")
def load_balancer_show(self, lb_id):
def load_balancer_show(self, lb):
"""Show a load balancer
:param string lb_id:
ID of the load balancer to show
:param string lb:
dict of the load balancer to show
:return:
A dict of the specified load balancer's settings
"""
return self._clients.octavia().load_balancer_show(lb_id)
return self._clients.octavia().load_balancer_show(lb["id"])
@atomic.action_timer("octavia.load_balancer_create")
def load_balancer_create(self, subnet_id, params=None):
def load_balancer_create(self, subnet_id, description=None,
admin_state=None, listeners=None, flavor_id=None,
provider=None, vip_qos_policy_id=None):
"""Create a load balancer
:param subnet_id:
The ID of the subnet for the Virtual IP (VIP)
:param params:
Paramaters to create the load balancer with (expects json=)
:return:
A dict of the created load balancer's settings
"""
lb_create_args = {
args = {
"name": self.generate_random_name(),
"description": description,
"listeners": listeners,
"provider": provider,
"admin_state_up": admin_state or True,
"vip_subnet_id": subnet_id,
"admin_state_up": True
"vip_qos_policy_id": vip_qos_policy_id,
}
if params:
lb_create_args.update(params)
return self._clients.octavia().load_balancer_create(
json={"loadbalancer": lb_create_args})
json={"loadbalancer": args})
@atomic.action_timer("octavia.load_balancer_delete")
def load_balancer_delete(self, lb_id):
def load_balancer_delete(self, lb_id, cascade=False):
"""Delete a load balancer
:param string lb_id:
The ID of the load balancer to zdelete
:param string lb:
The dict of the load balancer to delete
:return:
Response Code from the API
"""
return self._clients.octavia().load_balancer_delete(lb_id)
return self._clients.octavia().load_balancer_delete(
lb_id, cascade=cascade)
@atomic.action_timer("octavia.load_balancer_set")
def load_balancer_set(self, lb_id, params):
def load_balancer_set(self, lb_id, lb_update_args):
"""Update a load balancer's settings
:param string lb_id:
The ID of the load balancer to update
:param params:
The dict of the load balancer to update
:param lb_update_args:
A dict of arguments to update a loadbalancer
:return:
Response Code from API
"""
return self._clients.octavia().load_balancer_set(lb_id, params)
return self._clients.octavia().load_balancer_set(
lb_id, json={"loadbalancer": lb_update_args})
@atomic.action_timer("octavia.load_balancer_stats_show")
def load_balancer_stats_show(self, lb_id, **kwargs):
def load_balancer_stats_show(self, lb, **kwargs):
"""Shows the current statistics for a load balancer.
:param string lb_id:
ID of the load balancer
:param string lb:
dict of the load balancer
:return:
A dict of the specified load balancer's statistics
"""
return self._clients.octavia().load_balancer_stats_show(
lb_id, **kwargs)
lb["id"], **kwargs)
@atomic.action_timer("octavia.load_balancer_failover")
def load_balancer_failover(self, lb_id):
def load_balancer_failover(self, lb):
"""Trigger load balancer failover
:param string lb_id:
ID of the load balancer to failover
:param string lb:
dict of the load balancer to failover
:return:
Response Code from the API
"""
return self._clients.octavia().load_balancer_failover(lb_id)
return self._clients.octavia().load_balancer_failover(lb["id"])
@atomic.action_timer("octavia.listener_list")
def listener_list(self, **kwargs):

View File

@ -0,0 +1,25 @@
{
"Octavia.create_and_delete_loadbalancers": [
{
"args": {},
"runner": {
"type": "constant",
"times": 5,
"concurrency": 2
},
"context": {
"users": {
"tenants": 2,
"users_per_tenant": 2
},
"roles": ["load-balancer_member"],
"network": {}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@ -0,0 +1,18 @@
---
Octavia.create_and_delete_loadbalancers:
-
args: {}
runner:
type: "constant"
times: 5
concurrency: 2
context:
users:
tenants: 2
users_per_tenant: 2
roles:
- load-balancer_member
network: {}
sla:
failure_rate:
max: 0

View File

@ -1,6 +1,7 @@
{
"Octavia.create_and_list_loadbalancers": [
{
"args": {},
"runner": {
"type": "constant",
"times": 5,

View File

@ -1,6 +1,7 @@
---
Octavia.create_and_list_loadbalancers:
-
args: {}
runner:
type: "constant"
times: 5

View File

@ -0,0 +1,25 @@
{
"Octavia.create_and_show_loadbalancers": [
{
"args": {},
"runner": {
"type": "constant",
"times": 5,
"concurrency": 2
},
"context": {
"users": {
"tenants": 2,
"users_per_tenant": 2
},
"roles": ["load-balancer_member"],
"network": {}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@ -0,0 +1,18 @@
---
Octavia.create_and_show_loadbalancers:
-
args: {}
runner:
type: "constant"
times: 5
concurrency: 2
context:
users:
tenants: 2
users_per_tenant: 2
roles:
- load-balancer_member
network: {}
sla:
failure_rate:
max: 0

View File

@ -0,0 +1,25 @@
{
"Octavia.create_and_stats_loadbalancers": [
{
"args": {},
"runner": {
"type": "constant",
"times": 5,
"concurrency": 2
},
"context": {
"users": {
"tenants": 2,
"users_per_tenant": 2
},
"roles": ["load-balancer_member"],
"network": {}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@ -0,0 +1,18 @@
---
Octavia.create_and_stats_loadbalancers:
-
args: {}
runner:
type: "constant"
times: 5
concurrency: 2
context:
users:
tenants: 2
users_per_tenant: 2
roles:
- load-balancer_member
network: {}
sla:
failure_rate:
max: 0

View File

@ -0,0 +1,25 @@
{
"Octavia.create_and_update_loadbalancers": [
{
"args": {},
"runner": {
"type": "constant",
"times": 5,
"concurrency": 2
},
"context": {
"users": {
"tenants": 2,
"users_per_tenant": 2
},
"roles": ["load-balancer_member"],
"network": {}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@ -0,0 +1,18 @@
---
Octavia.create_and_update_loadbalancers:
-
args: {}
runner:
type: "constant"
times: 5
concurrency: 2
context:
users:
tenants: 2
users_per_tenant: 2
roles:
- load-balancer_member
network: {}
sla:
failure_rate:
max: 0

View File

@ -1244,7 +1244,7 @@ class OctaviaResourceTestCase(test.TestCase):
octavia.delete()
octavia._client().load_balancer_delete.assert_called_once_with(
"test_id")
"test_id", cascade=True)
def test_is_deleted_false(self):
octavia = self.get_octavia()

View File

@ -21,7 +21,14 @@ from tests.unit import test
class LoadBalancersTestCase(test.ScenarioTestCase):
def get_test_context(self):
def setUp(self):
super(LoadBalancersTestCase, self).setUp()
patch = mock.patch(
"rally_openstack.services.loadbalancer.octavia.Octavia")
self.addCleanup(patch.stop)
self.mock_loadbalancers = patch.start()
def _get_context(self):
context = super(LoadBalancersTestCase, self).get_test_context()
context.update({
"user": {
@ -34,25 +41,84 @@ class LoadBalancersTestCase(test.ScenarioTestCase):
"subnets": ["fake_subnet"]}]}})
return context
def setUp(self):
super(LoadBalancersTestCase, self).setUp()
patch = mock.patch(
"rally_openstack.services.loadbalancer.octavia.Octavia")
self.addCleanup(patch.stop)
self.mock_loadbalancers = patch.start()
def test_loadbalancers(self):
def test_create_and_list_loadbalancers(self):
loadbalancer_service = self.mock_loadbalancers.return_value
scenario = loadbalancers.CreateAndListLoadbalancers(self.context)
scenario = loadbalancers.CreateAndListLoadbalancers(
self._get_context())
scenario.run()
networks = self.context["tenant"]["networks"]
subnets = []
mock_has_calls = []
for network in networks:
subnets.extend(network.get("subnets", []))
for subnet_id in subnets:
mock_has_calls.append(mock.call(subnet_id))
loadbalancer_service.load_balancer_create.assert_called_once_with(
subnet_id)
self.assertEqual(1, loadbalancer_service.load_balancer_list.call_count)
loadbalancer_service.load_balancer_list.assert_called_once_with()
def test_create_and_delete_loadbalancers(self):
loadbalancer_service = self.mock_loadbalancers.return_value
scenario = loadbalancers.CreateAndDeleteLoadbalancers(
self._get_context())
scenario.run()
lb = [{
"loadbalancer": {
"id": "loadbalancer-id"
}
}]
loadbalancer_service.load_balancer_create.return_value = lb
loadbalancer_service.load_balancer_create(
admin_state=True, description=None, flavor_id=None,
listeners=None, provider=None,
subnet_id="fake_subnet", vip_qos_policy_id=None)
self.assertEqual(1,
loadbalancer_service.load_balancer_delete.call_count)
def test_create_and_update_loadbalancers(self):
loadbalancer_service = self.mock_loadbalancers.return_value
scenario = loadbalancers.CreateAndUpdateLoadBalancers(
self._get_context())
scenario.run()
lb = [{
"loadbalancer": {
"id": "loadbalancer-id"
}
}]
loadbalancer_service.load_balancer_create.return_value = lb
loadbalancer_service.load_balancer_create(
admin_state=True, description=None, flavor_id=None,
listeners=None, provider=None,
subnet_id="fake_subnet", vip_qos_policy_id=None)
self.assertEqual(1,
loadbalancer_service.load_balancer_set.call_count)
def test_create_and_show_stats(self):
loadbalancer_service = self.mock_loadbalancers.return_value
scenario = loadbalancers.CreateAndShowStatsLoadBalancers(
self._get_context())
scenario.run()
lb = [{
"loadbalancer": {
"id": "loadbalancer-id"
}
}]
loadbalancer_service.load_balancer_create.return_value = lb
loadbalancer_service.load_balancer_create(
admin_state=True, description=None, flavor_id=None,
listeners=None, provider=None,
subnet_id="fake_subnet", vip_qos_policy_id=None)
self.assertEqual(
1, loadbalancer_service.load_balancer_stats_show.call_count)
def test_create_and_show_loadbalancers(self):
loadbalancer_service = self.mock_loadbalancers.return_value
scenario = loadbalancers.CreateAndShowLoadBalancers(
self._get_context())
scenario.run()
lb = [{
"loadbalancer": {
"id": "loadbalancer-id"
}
}]
lb_show = {"id": "loadbalancer-id"}
loadbalancer_service.load_balancer_create.return_value = lb
loadbalancer_service.load_balancer_show.return_value = lb_show
loadbalancer_service.load_balancer_create(
admin_state=True, description=None, flavor_id=None,
listeners=None, provider=None,
subnet_id="fake_subnet", vip_qos_policy_id=None)
self.assertEqual(1,
loadbalancer_service.load_balancer_show.call_count)

View File

@ -32,6 +32,19 @@ class LoadBalancerServiceTestCase(test.TestCase):
self.service = octavia.Octavia(self.clients,
name_generator=self.name_generator)
def _get_context(self):
context = test.get_test_context()
context.update({
"user": {
"id": "fake_user",
"tenant_id": "fake_tenant",
"credential": mock.MagicMock()
},
"tenant": {"id": "fake_tenant",
"networks": [{"id": "fake_net",
"subnets": ["fake_subnet"]}]}})
return context
def atomic_actions(self):
return self.service._atomic_actions
@ -43,55 +56,62 @@ class LoadBalancerServiceTestCase(test.TestCase):
"octavia.load_balancer_list")
def test_load_balancer_show(self):
self.service.load_balancer_show("fake_lb")
lb = {"id": "loadbalancer-id"}
self.service.load_balancer_show(lb)
self.service._clients.octavia().load_balancer_show \
.assert_called_once_with("fake_lb")
.assert_called_once_with(lb["id"])
self._test_atomic_action_timer(self.atomic_actions(),
"octavia.load_balancer_show")
def test_load_balancer_create(self):
self.service.generate_random_name = mock.MagicMock(
return_value="lb")
self.service.load_balancer_create("fake_subnet")
self.assertEqual(
1, self.service._clients.octavia().load_balancer_create.call_count)
self.service.load_balancer_create("subnet_id")
self.service._clients.octavia().load_balancer_create \
.assert_called_once_with(
json={"loadbalancer":
{
"vip_subnet_id": "fake_subnet",
"name": "lb",
"admin_state_up": True
}})
.assert_called_once_with(json={
"loadbalancer": {"name": "lb",
"admin_state_up": True,
"vip_qos_policy_id": None,
"listeners": None,
"provider": None,
"vip_subnet_id": "subnet_id",
"description": None}})
self._test_atomic_action_timer(self.atomic_actions(),
"octavia.load_balancer_create")
def test_load_balancer_delete(self):
self.service.load_balancer_delete("fake_lb")
self.service.load_balancer_delete("lb-id")
self.service._clients.octavia().load_balancer_delete \
.assert_called_once_with("fake_lb")
.assert_called_once_with("lb-id", cascade=False)
self._test_atomic_action_timer(self.atomic_actions(),
"octavia.load_balancer_delete")
def test_load_balancer_set(self):
self.service.load_balancer_set("fake_lb", params={})
self.service.generate_random_name = mock.MagicMock(
return_value="new_lb")
lb_update_args = {"name": "new_lb_name"}
self.service.load_balancer_set(
"lb-id", lb_update_args=lb_update_args)
self.service._clients.octavia().load_balancer_set \
.assert_called_once_with("fake_lb", {})
.assert_called_once_with(
"lb-id", json={"loadbalancer": {"name": "new_lb_name"}})
self._test_atomic_action_timer(self.atomic_actions(),
"octavia.load_balancer_set")
def test_load_balancer_stats_show(self):
lb = {"id": "new_lb"}
self.assertEqual(
self.service.load_balancer_stats_show(lb_id="fake_lb", kwargs={}),
self.service.load_balancer_stats_show(lb, kwargs={}),
self.service._clients.octavia()
.load_balancer_stats_show.return_value)
self._test_atomic_action_timer(self.atomic_actions(),
"octavia.load_balancer_stats_show")
def test_load_balancer_failover(self):
self.service.load_balancer_failover(lb_id="fake_lb")
lb = {"id": "new_lb"}
self.service.load_balancer_failover(lb)
self.service._clients.octavia().load_balancer_failover \
.assert_called_once_with("fake_lb")
.assert_called_once_with(lb["id"])
self._test_atomic_action_timer(self.atomic_actions(),
"octavia.load_balancer_failover")