diff --git a/neutron/api/rpc/handlers/dvr_rpc.py b/neutron/api/rpc/handlers/dvr_rpc.py index ba648bb0ae..43a5d68acb 100644 --- a/neutron/api/rpc/handlers/dvr_rpc.py +++ b/neutron/api/rpc/handlers/dvr_rpc.py @@ -98,11 +98,9 @@ class DVRAgentRpcApiMixin(object): """Notify dvr mac address updates.""" if not dvr_macs: return - self.fanout_cast(context, - self.make_msg('dvr_mac_address_update', - dvr_macs=dvr_macs), - version=self.DVR_RPC_VERSION, - topic=self._get_dvr_update_topic()) + cctxt = self.client.prepare(topic=self._get_dvr_update_topic(), + version=self.DVR_RPC_VERSION, fanout=True) + cctxt.cast(context, 'dvr_mac_address_update', dvr_macs=dvr_macs) class DVRAgentRpcCallbackMixin(object): diff --git a/neutron/plugins/ml2/drivers/type_tunnel.py b/neutron/plugins/ml2/drivers/type_tunnel.py index b6f34eaa58..b0bad17834 100644 --- a/neutron/plugins/ml2/drivers/type_tunnel.py +++ b/neutron/plugins/ml2/drivers/type_tunnel.py @@ -187,8 +187,7 @@ class TunnelAgentRpcApiMixin(object): topics.UPDATE) def tunnel_update(self, context, tunnel_ip, tunnel_type): - self.fanout_cast(context, - self.make_msg('tunnel_update', - tunnel_ip=tunnel_ip, - tunnel_type=tunnel_type), - topic=self._get_tunnel_update_topic()) + cctxt = self.client.prepare(topic=self._get_tunnel_update_topic(), + fanout=True) + cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip, + tunnel_type=tunnel_type) diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 72fc955d17..d51d6d23b0 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo import messaging + from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.handlers import dvr_rpc from neutron.common import constants as q_const @@ -172,8 +174,7 @@ class RpcCallbacks(n_rpc.RpcCallback, LOG.debug('Port %s not found during ARP update', port_id) -class AgentNotifierApi(n_rpc.RpcProxy, - dvr_rpc.DVRAgentRpcApiMixin, +class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, type_tunnel.TunnelAgentRpcApiMixin): """Agent side of the openvswitch rpc API. @@ -185,30 +186,26 @@ class AgentNotifierApi(n_rpc.RpcProxy, """ - BASE_RPC_API_VERSION = '1.1' - def __init__(self, topic): - super(AgentNotifierApi, self).__init__( - topic=topic, default_version=self.BASE_RPC_API_VERSION) + self.topic = topic self.topic_network_delete = topics.get_topic_name(topic, topics.NETWORK, topics.DELETE) self.topic_port_update = topics.get_topic_name(topic, topics.PORT, topics.UPDATE) + target = messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) def network_delete(self, context, network_id): - self.fanout_cast(context, - self.make_msg('network_delete', - network_id=network_id), - topic=self.topic_network_delete) + cctxt = self.client.prepare(topic=self.topic_network_delete, + fanout=True) + cctxt.cast(context, 'network_delete', network_id=network_id) def port_update(self, context, port, network_type, segmentation_id, physical_network): - self.fanout_cast(context, - self.make_msg('port_update', - port=port, - network_type=network_type, - segmentation_id=segmentation_id, - physical_network=physical_network), - topic=self.topic_port_update) + cctxt = self.client.prepare(topic=self.topic_port_update, + fanout=True) + cctxt.cast(context, 'port_update', port=port, + network_type=network_type, segmentation_id=segmentation_id, + physical_network=physical_network) diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index a05d1dabf4..f4a5fb17ab 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -25,7 +25,6 @@ import mock from neutron.agent import rpc as agent_rpc from neutron.common import constants from neutron.common import exceptions -from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.openstack.common import context from neutron.plugins.ml2.drivers import type_tunnel @@ -166,35 +165,11 @@ class RpcCallbacksTestCase(base.BaseTestCase): class RpcApiTestCase(base.BaseTestCase): - def _test_rpc_api_legacy(self, rpcapi, topic, method, rpc_method, - **kwargs): - # NOTE(russellb) This can be removed once AgentNotifierApi has been - # converted over to no longer use the RpcProxy compatibility class. - ctxt = context.RequestContext('fake_user', 'fake_project') - expected_retval = 'foo' if rpc_method == 'call' else None - expected_version = kwargs.pop('version', None) - expected_msg = rpcapi.make_msg(method, **kwargs) - - rpc = n_rpc.RpcProxy - with mock.patch.object(rpc, rpc_method) as rpc_method_mock: - rpc_method_mock.return_value = expected_retval - retval = getattr(rpcapi, method)(ctxt, **kwargs) - - self.assertEqual(retval, expected_retval) - additional_args = {} - if topic: - additional_args['topic'] = topic - if expected_version: - additional_args['version'] = expected_version - expected = [ - mock.call(ctxt, expected_msg, **additional_args) - ] - rpc_method_mock.assert_has_calls(expected) - def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if rpc_method == 'call' else None expected_version = kwargs.pop('version', None) + fanout = kwargs.pop('fanout', False) with contextlib.nested( mock.patch.object(rpcapi.client, rpc_method), @@ -209,6 +184,10 @@ class RpcApiTestCase(base.BaseTestCase): prepare_args = {} if expected_version: prepare_args['version'] = expected_version + if fanout: + prepare_args['fanout'] = fanout + if topic: + prepare_args['topic'] = topic prepare_mock.assert_called_once_with(**prepare_args) self.assertEqual(retval, expected_retval) @@ -216,35 +195,36 @@ class RpcApiTestCase(base.BaseTestCase): def test_delete_network(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) - self._test_rpc_api_legacy( + self._test_rpc_api( rpcapi, topics.get_topic_name(topics.AGENT, topics.NETWORK, topics.DELETE), - 'network_delete', rpc_method='fanout_cast', - network_id='fake_request_spec') + 'network_delete', rpc_method='cast', + fanout=True, network_id='fake_request_spec') def test_port_update(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) - self._test_rpc_api_legacy( + self._test_rpc_api( rpcapi, topics.get_topic_name(topics.AGENT, topics.PORT, topics.UPDATE), - 'port_update', rpc_method='fanout_cast', - port='fake_port', + 'port_update', rpc_method='cast', + fanout=True, port='fake_port', network_type='fake_network_type', segmentation_id='fake_segmentation_id', physical_network='fake_physical_network') def test_tunnel_update(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) - self._test_rpc_api_legacy( + self._test_rpc_api( rpcapi, topics.get_topic_name(topics.AGENT, type_tunnel.TUNNEL, topics.UPDATE), - 'tunnel_update', rpc_method='fanout_cast', + 'tunnel_update', rpc_method='cast', + fanout=True, tunnel_ip='fake_ip', tunnel_type='gre') def test_device_details(self):