Merge "Drop RpcProxy usage from LBaaS code"
This commit is contained in:
commit
ab20b11513
@ -12,13 +12,14 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo import messaging
|
||||
|
||||
from neutron.common import rpc as n_rpc
|
||||
|
||||
|
||||
class LbaasAgentApi(n_rpc.RpcProxy):
|
||||
class LbaasAgentApi(object):
|
||||
"""Agent side of the Agent to Plugin RPC API."""
|
||||
|
||||
API_VERSION = '2.0'
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 2.0 Generic API for agent based drivers
|
||||
@ -26,63 +27,43 @@ class LbaasAgentApi(n_rpc.RpcProxy):
|
||||
# - pool_deployed() and update_status() methods added;
|
||||
|
||||
def __init__(self, topic, context, host):
|
||||
super(LbaasAgentApi, self).__init__(topic, self.API_VERSION)
|
||||
self.context = context
|
||||
self.host = host
|
||||
target = messaging.Target(topic=topic, version='2.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def get_ready_devices(self):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('get_ready_devices', host=self.host)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'get_ready_devices', host=self.host)
|
||||
|
||||
def pool_destroyed(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('pool_destroyed', pool_id=pool_id)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'pool_destroyed', pool_id=pool_id)
|
||||
|
||||
def pool_deployed(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('pool_deployed', pool_id=pool_id)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'pool_deployed', pool_id=pool_id)
|
||||
|
||||
def get_logical_device(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg(
|
||||
'get_logical_device',
|
||||
pool_id=pool_id
|
||||
)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'get_logical_device', pool_id=pool_id)
|
||||
|
||||
def update_status(self, obj_type, obj_id, status):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('update_status', obj_type=obj_type, obj_id=obj_id,
|
||||
status=status)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'update_status', obj_type=obj_type,
|
||||
obj_id=obj_id, status=status)
|
||||
|
||||
def plug_vip_port(self, port_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('plug_vip_port', port_id=port_id, host=self.host)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'plug_vip_port', port_id=port_id,
|
||||
host=self.host)
|
||||
|
||||
def unplug_vip_port(self, port_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('unplug_vip_port', port_id=port_id, host=self.host)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'unplug_vip_port', port_id=port_id,
|
||||
host=self.host)
|
||||
|
||||
def update_pool_stats(self, pool_id, stats):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg(
|
||||
'update_pool_stats',
|
||||
pool_id=pool_id,
|
||||
stats=stats,
|
||||
host=self.host
|
||||
)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'update_pool_stats', pool_id=pool_id,
|
||||
stats=stats, host=self.host)
|
||||
|
@ -15,6 +15,7 @@
|
||||
import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions as n_exc
|
||||
@ -231,10 +232,9 @@ class LoadBalancerCallbacks(n_rpc.RpcCallback):
|
||||
self.plugin.update_pool_stats(context, pool_id, data=stats)
|
||||
|
||||
|
||||
class LoadBalancerAgentApi(n_rpc.RpcProxy):
|
||||
class LoadBalancerAgentApi(object):
|
||||
"""Plugin side of plugin to agent RPC API."""
|
||||
|
||||
BASE_RPC_API_VERSION = '2.0'
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 1.1 Support agent_updated call
|
||||
@ -244,71 +244,69 @@ class LoadBalancerAgentApi(n_rpc.RpcProxy):
|
||||
# object individually;
|
||||
|
||||
def __init__(self, topic):
|
||||
super(LoadBalancerAgentApi, self).__init__(
|
||||
topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def _cast(self, context, method_name, method_args, host, version=None):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg(method_name, **method_args),
|
||||
topic='%s.%s' % (self.topic, host),
|
||||
version=version
|
||||
)
|
||||
target = messaging.Target(topic=topic, version='2.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def create_vip(self, context, vip, host):
|
||||
return self._cast(context, 'create_vip', {'vip': vip}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'create_vip', vip=vip)
|
||||
|
||||
def update_vip(self, context, old_vip, vip, host):
|
||||
return self._cast(context, 'update_vip',
|
||||
{'old_vip': old_vip, 'vip': vip}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'update_vip', old_vip=old_vip, vip=vip)
|
||||
|
||||
def delete_vip(self, context, vip, host):
|
||||
return self._cast(context, 'delete_vip', {'vip': vip}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'delete_vip', vip=vip)
|
||||
|
||||
def create_pool(self, context, pool, host, driver_name):
|
||||
return self._cast(context, 'create_pool',
|
||||
{'pool': pool, 'driver_name': driver_name}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'create_pool', pool=pool, driver_name=driver_name)
|
||||
|
||||
def update_pool(self, context, old_pool, pool, host):
|
||||
return self._cast(context, 'update_pool',
|
||||
{'old_pool': old_pool, 'pool': pool}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'update_pool', old_pool=old_pool, pool=pool)
|
||||
|
||||
def delete_pool(self, context, pool, host):
|
||||
return self._cast(context, 'delete_pool', {'pool': pool}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'delete_pool', pool=pool)
|
||||
|
||||
def create_member(self, context, member, host):
|
||||
return self._cast(context, 'create_member', {'member': member}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'create_member', member=member)
|
||||
|
||||
def update_member(self, context, old_member, member, host):
|
||||
return self._cast(context, 'update_member',
|
||||
{'old_member': old_member, 'member': member}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'update_member', old_member=old_member,
|
||||
member=member)
|
||||
|
||||
def delete_member(self, context, member, host):
|
||||
return self._cast(context, 'delete_member', {'member': member}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'delete_member', member=member)
|
||||
|
||||
def create_pool_health_monitor(self, context, health_monitor, pool_id,
|
||||
host):
|
||||
return self._cast(context, 'create_pool_health_monitor',
|
||||
{'health_monitor': health_monitor,
|
||||
'pool_id': pool_id}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'create_pool_health_monitor',
|
||||
health_monitor=health_monitor, pool_id=pool_id)
|
||||
|
||||
def update_pool_health_monitor(self, context, old_health_monitor,
|
||||
health_monitor, pool_id, host):
|
||||
return self._cast(context, 'update_pool_health_monitor',
|
||||
{'old_health_monitor': old_health_monitor,
|
||||
'health_monitor': health_monitor,
|
||||
'pool_id': pool_id}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'update_pool_health_monitor',
|
||||
old_health_monitor=old_health_monitor,
|
||||
health_monitor=health_monitor, pool_id=pool_id)
|
||||
|
||||
def delete_pool_health_monitor(self, context, health_monitor, pool_id,
|
||||
host):
|
||||
return self._cast(context, 'delete_pool_health_monitor',
|
||||
{'health_monitor': health_monitor,
|
||||
'pool_id': pool_id}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'delete_pool_health_monitor',
|
||||
health_monitor=health_monitor, pool_id=pool_id)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
return self._cast(context, 'agent_updated',
|
||||
{'payload': {'admin_state_up': admin_state_up}},
|
||||
host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'agent_updated',
|
||||
payload={'admin_state_up': admin_state_up})
|
||||
|
||||
|
||||
class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
|
||||
|
@ -12,6 +12,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import copy
|
||||
import mock
|
||||
|
||||
from neutron.services.loadbalancer.agent import agent_api as api
|
||||
@ -23,132 +25,57 @@ class TestApiCache(base.BaseTestCase):
|
||||
super(TestApiCache, self).setUp()
|
||||
|
||||
self.api = api.LbaasAgentApi('topic', mock.sentinel.context, 'host')
|
||||
self.make_msg = mock.patch.object(self.api, 'make_msg').start()
|
||||
self.mock_call = mock.patch.object(self.api, 'call').start()
|
||||
|
||||
def test_init(self):
|
||||
self.assertEqual(self.api.host, 'host')
|
||||
self.assertEqual(self.api.context, mock.sentinel.context)
|
||||
|
||||
def test_get_ready_devices(self):
|
||||
self.assertEqual(
|
||||
self.api.get_ready_devices(),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
def _test_method(self, method, **kwargs):
|
||||
add_host = ('get_ready_devices', 'plug_vip_port', 'unplug_vip_port',
|
||||
'update_pool_stats')
|
||||
expected_kwargs = copy.copy(kwargs)
|
||||
if method in add_host:
|
||||
expected_kwargs['host'] = self.api.host
|
||||
|
||||
self.make_msg.assert_called_once_with('get_ready_devices', host='host')
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.api.client, 'call'),
|
||||
mock.patch.object(self.api.client, 'prepare'),
|
||||
) as (
|
||||
rpc_mock, prepare_mock
|
||||
):
|
||||
prepare_mock.return_value = self.api.client
|
||||
rpc_mock.return_value = 'foo'
|
||||
rv = getattr(self.api, method)(**kwargs)
|
||||
|
||||
self.assertEqual(rv, 'foo')
|
||||
|
||||
prepare_args = {}
|
||||
prepare_mock.assert_called_once_with(**prepare_args)
|
||||
|
||||
rpc_mock.assert_called_once_with(mock.sentinel.context, method,
|
||||
**expected_kwargs)
|
||||
|
||||
def test_get_ready_devices(self):
|
||||
self._test_method('get_ready_devices')
|
||||
|
||||
def test_get_logical_device(self):
|
||||
self.assertEqual(
|
||||
self.api.get_logical_device('pool_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'get_logical_device',
|
||||
pool_id='pool_id')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('get_logical_device', pool_id='pool_id')
|
||||
|
||||
def test_pool_destroyed(self):
|
||||
self.assertEqual(
|
||||
self.api.pool_destroyed('pool_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'pool_destroyed',
|
||||
pool_id='pool_id')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('pool_destroyed', pool_id='pool_id')
|
||||
|
||||
def test_pool_deployed(self):
|
||||
self.assertEqual(
|
||||
self.api.pool_deployed('pool_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'pool_deployed',
|
||||
pool_id='pool_id')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('pool_deployed', pool_id='pool_id')
|
||||
|
||||
def test_update_status(self):
|
||||
self.assertEqual(
|
||||
self.api.update_status('pool', 'pool_id', 'ACTIVE'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'update_status',
|
||||
obj_type='pool',
|
||||
obj_id='pool_id',
|
||||
status='ACTIVE')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value,
|
||||
)
|
||||
self._test_method('update_status', obj_type='type', obj_id='id',
|
||||
status='status')
|
||||
|
||||
def test_plug_vip_port(self):
|
||||
self.assertEqual(
|
||||
self.api.plug_vip_port('port_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'plug_vip_port',
|
||||
port_id='port_id',
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('plug_vip_port', port_id='port_id')
|
||||
|
||||
def test_unplug_vip_port(self):
|
||||
self.assertEqual(
|
||||
self.api.unplug_vip_port('port_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'unplug_vip_port',
|
||||
port_id='port_id',
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('unplug_vip_port', port_id='port_id')
|
||||
|
||||
def test_update_pool_stats(self):
|
||||
self.assertEqual(
|
||||
self.api.update_pool_stats('pool_id', {'stat': 'stat'}),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'update_pool_stats',
|
||||
pool_id='pool_id',
|
||||
stats={'stat': 'stat'},
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('update_pool_stats', pool_id='id', stats='stats')
|
||||
|
@ -434,30 +434,29 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
|
||||
super(TestLoadBalancerAgentApi, self).setUp()
|
||||
|
||||
self.api = agent_driver_base.LoadBalancerAgentApi('topic')
|
||||
self.mock_cast = mock.patch.object(self.api, 'cast').start()
|
||||
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
|
||||
|
||||
def test_init(self):
|
||||
self.assertEqual(self.api.topic, 'topic')
|
||||
self.assertEqual(self.api.client.target.topic, 'topic')
|
||||
|
||||
def _call_test_helper(self, method_name, method_args):
|
||||
rv = getattr(self.api, method_name)(mock.sentinel.context,
|
||||
host='host',
|
||||
**method_args)
|
||||
self.assertEqual(rv, self.mock_cast.return_value)
|
||||
self.mock_cast.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.mock_msg.return_value,
|
||||
topic='topic.host',
|
||||
version=None
|
||||
)
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.api.client, 'cast'),
|
||||
mock.patch.object(self.api.client, 'prepare'),
|
||||
) as (
|
||||
rpc_mock, prepare_mock
|
||||
):
|
||||
prepare_mock.return_value = self.api.client
|
||||
getattr(self.api, method_name)(mock.sentinel.context,
|
||||
host='host',
|
||||
**method_args)
|
||||
|
||||
prepare_args = {'server': 'host'}
|
||||
prepare_mock.assert_called_once_with(**prepare_args)
|
||||
|
||||
if method_name == 'agent_updated':
|
||||
method_args = {'payload': method_args}
|
||||
self.mock_msg.assert_called_once_with(
|
||||
method_name,
|
||||
**method_args
|
||||
)
|
||||
rpc_mock.assert_called_once_with(mock.sentinel.context, method_name,
|
||||
**method_args)
|
||||
|
||||
def test_agent_updated(self):
|
||||
self._call_test_helper('agent_updated', {'admin_state_up': 'test'})
|
||||
|
Loading…
x
Reference in New Issue
Block a user