Merge "Drop RpcProxy usage from ml2 AgentNotifierApi"

This commit is contained in:
Jenkins 2014-12-02 00:01:56 +00:00 committed by Gerrit Code Review
commit 168fbd80e5
4 changed files with 35 additions and 61 deletions

View File

@ -98,11 +98,9 @@ class DVRAgentRpcApiMixin(object):
"""Notify dvr mac address updates."""
if not dvr_macs:
return
self.fanout_cast(context,
self.make_msg('dvr_mac_address_update',
dvr_macs=dvr_macs),
version=self.DVR_RPC_VERSION,
topic=self._get_dvr_update_topic())
cctxt = self.client.prepare(topic=self._get_dvr_update_topic(),
version=self.DVR_RPC_VERSION, fanout=True)
cctxt.cast(context, 'dvr_mac_address_update', dvr_macs=dvr_macs)
class DVRAgentRpcCallbackMixin(object):

View File

@ -187,8 +187,7 @@ class TunnelAgentRpcApiMixin(object):
topics.UPDATE)
def tunnel_update(self, context, tunnel_ip, tunnel_type):
self.fanout_cast(context,
self.make_msg('tunnel_update',
tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type),
topic=self._get_tunnel_update_topic())
cctxt = self.client.prepare(topic=self._get_tunnel_update_topic(),
fanout=True)
cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type)

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo import messaging
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
from neutron.common import constants as q_const
@ -172,8 +174,7 @@ class RpcCallbacks(n_rpc.RpcCallback,
LOG.debug('Port %s not found during ARP update', port_id)
class AgentNotifierApi(n_rpc.RpcProxy,
dvr_rpc.DVRAgentRpcApiMixin,
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
"""Agent side of the openvswitch rpc API.
@ -185,30 +186,26 @@ class AgentNotifierApi(n_rpc.RpcProxy,
"""
BASE_RPC_API_VERSION = '1.1'
def __init__(self, topic):
super(AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
self.topic_port_update = topics.get_topic_name(topic,
topics.PORT,
topics.UPDATE)
target = messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def network_delete(self, context, network_id):
self.fanout_cast(context,
self.make_msg('network_delete',
network_id=network_id),
topic=self.topic_network_delete)
cctxt = self.client.prepare(topic=self.topic_network_delete,
fanout=True)
cctxt.cast(context, 'network_delete', network_id=network_id)
def port_update(self, context, port, network_type, segmentation_id,
physical_network):
self.fanout_cast(context,
self.make_msg('port_update',
port=port,
network_type=network_type,
segmentation_id=segmentation_id,
physical_network=physical_network),
topic=self.topic_port_update)
cctxt = self.client.prepare(topic=self.topic_port_update,
fanout=True)
cctxt.cast(context, 'port_update', port=port,
network_type=network_type, segmentation_id=segmentation_id,
physical_network=physical_network)

View File

@ -25,7 +25,6 @@ import mock
from neutron.agent import rpc as agent_rpc
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.openstack.common import context
from neutron.plugins.ml2.drivers import type_tunnel
@ -166,35 +165,11 @@ class RpcCallbacksTestCase(base.BaseTestCase):
class RpcApiTestCase(base.BaseTestCase):
def _test_rpc_api_legacy(self, rpcapi, topic, method, rpc_method,
**kwargs):
# NOTE(russellb) This can be removed once AgentNotifierApi has been
# converted over to no longer use the RpcProxy compatibility class.
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
expected_msg = rpcapi.make_msg(method, **kwargs)
rpc = n_rpc.RpcProxy
with mock.patch.object(rpc, rpc_method) as rpc_method_mock:
rpc_method_mock.return_value = expected_retval
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
additional_args = {}
if topic:
additional_args['topic'] = topic
if expected_version:
additional_args['version'] = expected_version
expected = [
mock.call(ctxt, expected_msg, **additional_args)
]
rpc_method_mock.assert_has_calls(expected)
def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
fanout = kwargs.pop('fanout', False)
with contextlib.nested(
mock.patch.object(rpcapi.client, rpc_method),
@ -209,6 +184,10 @@ class RpcApiTestCase(base.BaseTestCase):
prepare_args = {}
if expected_version:
prepare_args['version'] = expected_version
if fanout:
prepare_args['fanout'] = fanout
if topic:
prepare_args['topic'] = topic
prepare_mock.assert_called_once_with(**prepare_args)
self.assertEqual(retval, expected_retval)
@ -216,35 +195,36 @@ class RpcApiTestCase(base.BaseTestCase):
def test_delete_network(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
self._test_rpc_api_legacy(
self._test_rpc_api(
rpcapi,
topics.get_topic_name(topics.AGENT,
topics.NETWORK,
topics.DELETE),
'network_delete', rpc_method='fanout_cast',
network_id='fake_request_spec')
'network_delete', rpc_method='cast',
fanout=True, network_id='fake_request_spec')
def test_port_update(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
self._test_rpc_api_legacy(
self._test_rpc_api(
rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
port='fake_port',
'port_update', rpc_method='cast',
fanout=True, port='fake_port',
network_type='fake_network_type',
segmentation_id='fake_segmentation_id',
physical_network='fake_physical_network')
def test_tunnel_update(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
self._test_rpc_api_legacy(
self._test_rpc_api(
rpcapi,
topics.get_topic_name(topics.AGENT,
type_tunnel.TUNNEL,
topics.UPDATE),
'tunnel_update', rpc_method='fanout_cast',
'tunnel_update', rpc_method='cast',
fanout=True,
tunnel_ip='fake_ip', tunnel_type='gre')
def test_device_details(self):