Drop RpcProxy usage from LBaaS code
Update the LBaaS code to stop using the RpcProxy compatibility class. The equivalent direct usage of oslo.messaging APIs are now used instead. Part of blueprint drop-rpc-compat. Change-Id: I381394507e4f2daf6d774f70087fef8833c9bab5
This commit is contained in:
parent
ffbdbe64e2
commit
1679cff51c
@ -12,13 +12,14 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo import messaging
|
||||
|
||||
from neutron.common import rpc as n_rpc
|
||||
|
||||
|
||||
class LbaasAgentApi(n_rpc.RpcProxy):
|
||||
class LbaasAgentApi(object):
|
||||
"""Agent side of the Agent to Plugin RPC API."""
|
||||
|
||||
API_VERSION = '2.0'
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 2.0 Generic API for agent based drivers
|
||||
@ -26,63 +27,43 @@ class LbaasAgentApi(n_rpc.RpcProxy):
|
||||
# - pool_deployed() and update_status() methods added;
|
||||
|
||||
def __init__(self, topic, context, host):
|
||||
super(LbaasAgentApi, self).__init__(topic, self.API_VERSION)
|
||||
self.context = context
|
||||
self.host = host
|
||||
target = messaging.Target(topic=topic, version='2.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def get_ready_devices(self):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('get_ready_devices', host=self.host)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'get_ready_devices', host=self.host)
|
||||
|
||||
def pool_destroyed(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('pool_destroyed', pool_id=pool_id)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'pool_destroyed', pool_id=pool_id)
|
||||
|
||||
def pool_deployed(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('pool_deployed', pool_id=pool_id)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'pool_deployed', pool_id=pool_id)
|
||||
|
||||
def get_logical_device(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg(
|
||||
'get_logical_device',
|
||||
pool_id=pool_id
|
||||
)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'get_logical_device', pool_id=pool_id)
|
||||
|
||||
def update_status(self, obj_type, obj_id, status):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('update_status', obj_type=obj_type, obj_id=obj_id,
|
||||
status=status)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'update_status', obj_type=obj_type,
|
||||
obj_id=obj_id, status=status)
|
||||
|
||||
def plug_vip_port(self, port_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('plug_vip_port', port_id=port_id, host=self.host)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'plug_vip_port', port_id=port_id,
|
||||
host=self.host)
|
||||
|
||||
def unplug_vip_port(self, port_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('unplug_vip_port', port_id=port_id, host=self.host)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'unplug_vip_port', port_id=port_id,
|
||||
host=self.host)
|
||||
|
||||
def update_pool_stats(self, pool_id, stats):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg(
|
||||
'update_pool_stats',
|
||||
pool_id=pool_id,
|
||||
stats=stats,
|
||||
host=self.host
|
||||
)
|
||||
)
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(self.context, 'update_pool_stats', pool_id=pool_id,
|
||||
stats=stats, host=self.host)
|
||||
|
@ -15,6 +15,7 @@
|
||||
import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions as n_exc
|
||||
@ -230,10 +231,9 @@ class LoadBalancerCallbacks(n_rpc.RpcCallback):
|
||||
self.plugin.update_pool_stats(context, pool_id, data=stats)
|
||||
|
||||
|
||||
class LoadBalancerAgentApi(n_rpc.RpcProxy):
|
||||
class LoadBalancerAgentApi(object):
|
||||
"""Plugin side of plugin to agent RPC API."""
|
||||
|
||||
BASE_RPC_API_VERSION = '2.0'
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 1.1 Support agent_updated call
|
||||
@ -243,71 +243,69 @@ class LoadBalancerAgentApi(n_rpc.RpcProxy):
|
||||
# object individually;
|
||||
|
||||
def __init__(self, topic):
|
||||
super(LoadBalancerAgentApi, self).__init__(
|
||||
topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def _cast(self, context, method_name, method_args, host, version=None):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg(method_name, **method_args),
|
||||
topic='%s.%s' % (self.topic, host),
|
||||
version=version
|
||||
)
|
||||
target = messaging.Target(topic=topic, version='2.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def create_vip(self, context, vip, host):
|
||||
return self._cast(context, 'create_vip', {'vip': vip}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'create_vip', vip=vip)
|
||||
|
||||
def update_vip(self, context, old_vip, vip, host):
|
||||
return self._cast(context, 'update_vip',
|
||||
{'old_vip': old_vip, 'vip': vip}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'update_vip', old_vip=old_vip, vip=vip)
|
||||
|
||||
def delete_vip(self, context, vip, host):
|
||||
return self._cast(context, 'delete_vip', {'vip': vip}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'delete_vip', vip=vip)
|
||||
|
||||
def create_pool(self, context, pool, host, driver_name):
|
||||
return self._cast(context, 'create_pool',
|
||||
{'pool': pool, 'driver_name': driver_name}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'create_pool', pool=pool, driver_name=driver_name)
|
||||
|
||||
def update_pool(self, context, old_pool, pool, host):
|
||||
return self._cast(context, 'update_pool',
|
||||
{'old_pool': old_pool, 'pool': pool}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'update_pool', old_pool=old_pool, pool=pool)
|
||||
|
||||
def delete_pool(self, context, pool, host):
|
||||
return self._cast(context, 'delete_pool', {'pool': pool}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'delete_pool', pool=pool)
|
||||
|
||||
def create_member(self, context, member, host):
|
||||
return self._cast(context, 'create_member', {'member': member}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'create_member', member=member)
|
||||
|
||||
def update_member(self, context, old_member, member, host):
|
||||
return self._cast(context, 'update_member',
|
||||
{'old_member': old_member, 'member': member}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'update_member', old_member=old_member,
|
||||
member=member)
|
||||
|
||||
def delete_member(self, context, member, host):
|
||||
return self._cast(context, 'delete_member', {'member': member}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'delete_member', member=member)
|
||||
|
||||
def create_pool_health_monitor(self, context, health_monitor, pool_id,
|
||||
host):
|
||||
return self._cast(context, 'create_pool_health_monitor',
|
||||
{'health_monitor': health_monitor,
|
||||
'pool_id': pool_id}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'create_pool_health_monitor',
|
||||
health_monitor=health_monitor, pool_id=pool_id)
|
||||
|
||||
def update_pool_health_monitor(self, context, old_health_monitor,
|
||||
health_monitor, pool_id, host):
|
||||
return self._cast(context, 'update_pool_health_monitor',
|
||||
{'old_health_monitor': old_health_monitor,
|
||||
'health_monitor': health_monitor,
|
||||
'pool_id': pool_id}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'update_pool_health_monitor',
|
||||
old_health_monitor=old_health_monitor,
|
||||
health_monitor=health_monitor, pool_id=pool_id)
|
||||
|
||||
def delete_pool_health_monitor(self, context, health_monitor, pool_id,
|
||||
host):
|
||||
return self._cast(context, 'delete_pool_health_monitor',
|
||||
{'health_monitor': health_monitor,
|
||||
'pool_id': pool_id}, host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'delete_pool_health_monitor',
|
||||
health_monitor=health_monitor, pool_id=pool_id)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
return self._cast(context, 'agent_updated',
|
||||
{'payload': {'admin_state_up': admin_state_up}},
|
||||
host)
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, 'agent_updated',
|
||||
payload={'admin_state_up': admin_state_up})
|
||||
|
||||
|
||||
class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
|
||||
|
@ -12,6 +12,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import copy
|
||||
import mock
|
||||
|
||||
from neutron.services.loadbalancer.agent import agent_api as api
|
||||
@ -23,132 +25,57 @@ class TestApiCache(base.BaseTestCase):
|
||||
super(TestApiCache, self).setUp()
|
||||
|
||||
self.api = api.LbaasAgentApi('topic', mock.sentinel.context, 'host')
|
||||
self.make_msg = mock.patch.object(self.api, 'make_msg').start()
|
||||
self.mock_call = mock.patch.object(self.api, 'call').start()
|
||||
|
||||
def test_init(self):
|
||||
self.assertEqual(self.api.host, 'host')
|
||||
self.assertEqual(self.api.context, mock.sentinel.context)
|
||||
|
||||
def test_get_ready_devices(self):
|
||||
self.assertEqual(
|
||||
self.api.get_ready_devices(),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
def _test_method(self, method, **kwargs):
|
||||
add_host = ('get_ready_devices', 'plug_vip_port', 'unplug_vip_port',
|
||||
'update_pool_stats')
|
||||
expected_kwargs = copy.copy(kwargs)
|
||||
if method in add_host:
|
||||
expected_kwargs['host'] = self.api.host
|
||||
|
||||
self.make_msg.assert_called_once_with('get_ready_devices', host='host')
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.api.client, 'call'),
|
||||
mock.patch.object(self.api.client, 'prepare'),
|
||||
) as (
|
||||
rpc_mock, prepare_mock
|
||||
):
|
||||
prepare_mock.return_value = self.api.client
|
||||
rpc_mock.return_value = 'foo'
|
||||
rv = getattr(self.api, method)(**kwargs)
|
||||
|
||||
self.assertEqual(rv, 'foo')
|
||||
|
||||
prepare_args = {}
|
||||
prepare_mock.assert_called_once_with(**prepare_args)
|
||||
|
||||
rpc_mock.assert_called_once_with(mock.sentinel.context, method,
|
||||
**expected_kwargs)
|
||||
|
||||
def test_get_ready_devices(self):
|
||||
self._test_method('get_ready_devices')
|
||||
|
||||
def test_get_logical_device(self):
|
||||
self.assertEqual(
|
||||
self.api.get_logical_device('pool_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'get_logical_device',
|
||||
pool_id='pool_id')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('get_logical_device', pool_id='pool_id')
|
||||
|
||||
def test_pool_destroyed(self):
|
||||
self.assertEqual(
|
||||
self.api.pool_destroyed('pool_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'pool_destroyed',
|
||||
pool_id='pool_id')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('pool_destroyed', pool_id='pool_id')
|
||||
|
||||
def test_pool_deployed(self):
|
||||
self.assertEqual(
|
||||
self.api.pool_deployed('pool_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'pool_deployed',
|
||||
pool_id='pool_id')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('pool_deployed', pool_id='pool_id')
|
||||
|
||||
def test_update_status(self):
|
||||
self.assertEqual(
|
||||
self.api.update_status('pool', 'pool_id', 'ACTIVE'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'update_status',
|
||||
obj_type='pool',
|
||||
obj_id='pool_id',
|
||||
status='ACTIVE')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value,
|
||||
)
|
||||
self._test_method('update_status', obj_type='type', obj_id='id',
|
||||
status='status')
|
||||
|
||||
def test_plug_vip_port(self):
|
||||
self.assertEqual(
|
||||
self.api.plug_vip_port('port_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'plug_vip_port',
|
||||
port_id='port_id',
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('plug_vip_port', port_id='port_id')
|
||||
|
||||
def test_unplug_vip_port(self):
|
||||
self.assertEqual(
|
||||
self.api.unplug_vip_port('port_id'),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'unplug_vip_port',
|
||||
port_id='port_id',
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('unplug_vip_port', port_id='port_id')
|
||||
|
||||
def test_update_pool_stats(self):
|
||||
self.assertEqual(
|
||||
self.api.update_pool_stats('pool_id', {'stat': 'stat'}),
|
||||
self.mock_call.return_value
|
||||
)
|
||||
|
||||
self.make_msg.assert_called_once_with(
|
||||
'update_pool_stats',
|
||||
pool_id='pool_id',
|
||||
stats={'stat': 'stat'},
|
||||
host='host')
|
||||
|
||||
self.mock_call.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.make_msg.return_value
|
||||
)
|
||||
self._test_method('update_pool_stats', pool_id='id', stats='stats')
|
||||
|
@ -434,30 +434,29 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
|
||||
super(TestLoadBalancerAgentApi, self).setUp()
|
||||
|
||||
self.api = agent_driver_base.LoadBalancerAgentApi('topic')
|
||||
self.mock_cast = mock.patch.object(self.api, 'cast').start()
|
||||
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
|
||||
|
||||
def test_init(self):
|
||||
self.assertEqual(self.api.topic, 'topic')
|
||||
self.assertEqual(self.api.client.target.topic, 'topic')
|
||||
|
||||
def _call_test_helper(self, method_name, method_args):
|
||||
rv = getattr(self.api, method_name)(mock.sentinel.context,
|
||||
host='host',
|
||||
**method_args)
|
||||
self.assertEqual(rv, self.mock_cast.return_value)
|
||||
self.mock_cast.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.mock_msg.return_value,
|
||||
topic='topic.host',
|
||||
version=None
|
||||
)
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.api.client, 'cast'),
|
||||
mock.patch.object(self.api.client, 'prepare'),
|
||||
) as (
|
||||
rpc_mock, prepare_mock
|
||||
):
|
||||
prepare_mock.return_value = self.api.client
|
||||
getattr(self.api, method_name)(mock.sentinel.context,
|
||||
host='host',
|
||||
**method_args)
|
||||
|
||||
prepare_args = {'server': 'host'}
|
||||
prepare_mock.assert_called_once_with(**prepare_args)
|
||||
|
||||
if method_name == 'agent_updated':
|
||||
method_args = {'payload': method_args}
|
||||
self.mock_msg.assert_called_once_with(
|
||||
method_name,
|
||||
**method_args
|
||||
)
|
||||
rpc_mock.assert_called_once_with(mock.sentinel.context, method_name,
|
||||
**method_args)
|
||||
|
||||
def test_agent_updated(self):
|
||||
self._call_test_helper('agent_updated', {'admin_state_up': 'test'})
|
||||
|
Loading…
x
Reference in New Issue
Block a user