Merge "Drop RpcProxy usage from neutron.agent.rpc.PluginApi"

This commit is contained in:
Jenkins 2014-11-20 20:10:10 +00:00 committed by Gerrit Code Review
commit 39637363a4
11 changed files with 283 additions and 182 deletions

View File

@ -69,7 +69,7 @@ class PluginReportStateAPI(object):
return method(context, 'report_state', **kwargs) return method(context, 'report_state', **kwargs)
class PluginApi(n_rpc.RpcProxy): class PluginApi(object):
'''Agent side of the rpc API. '''Agent side of the rpc API.
API version history: API version history:
@ -79,51 +79,45 @@ class PluginApi(n_rpc.RpcProxy):
the device port the device port
''' '''
BASE_RPC_API_VERSION = '1.1'
def __init__(self, topic): def __init__(self, topic):
super(PluginApi, self).__init__( target = messaging.Target(topic=topic, version='1.0')
topic=topic, default_version=self.BASE_RPC_API_VERSION) self.client = n_rpc.get_client(target)
def get_device_details(self, context, device, agent_id, host=None): def get_device_details(self, context, device, agent_id, host=None):
return self.call(context, cctxt = self.client.prepare()
self.make_msg('get_device_details', device=device, return cctxt.call(context, 'get_device_details', device=device,
agent_id=agent_id, host=host)) agent_id=agent_id, host=host)
def get_devices_details_list(self, context, devices, agent_id, host=None): def get_devices_details_list(self, context, devices, agent_id, host=None):
res = [] res = []
try: try:
res = self.call(context, cctxt = self.client.prepare(version='1.3')
self.make_msg('get_devices_details_list', res = cctxt.call(context, 'get_devices_details_list',
devices=devices, devices=devices, agent_id=agent_id, host=host)
agent_id=agent_id,
host=host),
version='1.3')
except messaging.UnsupportedVersion: except messaging.UnsupportedVersion:
# If the server has not been upgraded yet, a DVR-enabled agent # If the server has not been upgraded yet, a DVR-enabled agent
# may not work correctly, however it can function in 'degraded' # may not work correctly, however it can function in 'degraded'
# mode, in that DVR routers may not be in the system yet, and # mode, in that DVR routers may not be in the system yet, and
# it might be not necessary to retrieve info about the host. # it might be not necessary to retrieve info about the host.
LOG.warn(_LW('DVR functionality requires a server upgrade.')) LOG.warn(_LW('DVR functionality requires a server upgrade.'))
cctxt = self.client.prepare()
res = [ res = [
self.call(context, self.get_device_details(context, device, agent_id, host)
self.make_msg('get_device_details', device=device,
agent_id=agent_id, host=host))
for device in devices for device in devices
] ]
return res return res
def update_device_down(self, context, device, agent_id, host=None): def update_device_down(self, context, device, agent_id, host=None):
return self.call(context, cctxt = self.client.prepare()
self.make_msg('update_device_down', device=device, return cctxt.call(context, 'update_device_down', device=device,
agent_id=agent_id, host=host)) agent_id=agent_id, host=host)
def update_device_up(self, context, device, agent_id, host=None): def update_device_up(self, context, device, agent_id, host=None):
return self.call(context, cctxt = self.client.prepare()
self.make_msg('update_device_up', device=device, return cctxt.call(context, 'update_device_up', device=device,
agent_id=agent_id, host=host)) agent_id=agent_id, host=host)
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None): def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
return self.call(context, cctxt = self.client.prepare()
self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip, return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type)) tunnel_type=tunnel_type)

View File

@ -89,18 +89,16 @@ class SecurityGroupServerRpcApiMixin(object):
def security_group_rules_for_devices(self, context, devices): def security_group_rules_for_devices(self, context, devices):
LOG.debug("Get security group rules " LOG.debug("Get security group rules "
"for devices via rpc %r", devices) "for devices via rpc %r", devices)
return self.call(context, cctxt = self.client.prepare(version='1.1')
self.make_msg('security_group_rules_for_devices', return cctxt.call(context, 'security_group_rules_for_devices',
devices=devices), devices=devices)
version='1.1')
def security_group_info_for_devices(self, context, devices): def security_group_info_for_devices(self, context, devices):
LOG.debug("Get security group information for devices via rpc %r", LOG.debug("Get security group information for devices via rpc %r",
devices) devices)
return self.call(context, cctxt = self.client.prepare(version='1.2')
self.make_msg('security_group_info_for_devices', return cctxt.call(context, 'security_group_info_for_devices',
devices=devices), devices=devices)
version='1.2')
class SecurityGroupAgentRpcCallbackMixin(object): class SecurityGroupAgentRpcCallbackMixin(object):
@ -358,6 +356,10 @@ class SecurityGroupAgentRpcMixin(object):
self.refresh_firewall(updated_devices) self.refresh_firewall(updated_devices)
# NOTE(russellb) This class has been conditionally converted to use the
# oslo.messaging APIs because it's a mix-in used in different places. The
# conditional usage is temporary until the whole code base has been converted
# to stop using the RpcProxy compatibility class.
class SecurityGroupAgentRpcApiMixin(object): class SecurityGroupAgentRpcApiMixin(object):
def _get_security_group_topic(self): def _get_security_group_topic(self):
@ -369,25 +371,45 @@ class SecurityGroupAgentRpcApiMixin(object):
"""Notify rule updated security groups.""" """Notify rule updated security groups."""
if not security_groups: if not security_groups:
return return
self.fanout_cast(context, if hasattr(self, 'client'):
self.make_msg('security_groups_rule_updated', cctxt = self.client.prepare(version=SG_RPC_VERSION,
security_groups=security_groups), topic=self._get_security_group_topic(),
version=SG_RPC_VERSION, fanout=True)
topic=self._get_security_group_topic()) cctxt.cast(context, 'security_groups_rule_updated',
security_groups=security_groups)
else:
self.fanout_cast(context,
self.make_msg('security_groups_rule_updated',
security_groups=security_groups),
version=SG_RPC_VERSION,
topic=self._get_security_group_topic())
def security_groups_member_updated(self, context, security_groups): def security_groups_member_updated(self, context, security_groups):
"""Notify member updated security groups.""" """Notify member updated security groups."""
if not security_groups: if not security_groups:
return return
self.fanout_cast(context, if hasattr(self, 'client'):
self.make_msg('security_groups_member_updated', cctxt = self.client.prepare(version=SG_RPC_VERSION,
security_groups=security_groups), topic=self._get_security_group_topic(),
version=SG_RPC_VERSION, fanout=True)
topic=self._get_security_group_topic()) cctxt.cast(context, 'security_groups_member_updated',
security_groups=security_groups)
else:
self.fanout_cast(context,
self.make_msg('security_groups_member_updated',
security_groups=security_groups),
version=SG_RPC_VERSION,
topic=self._get_security_group_topic())
def security_groups_provider_updated(self, context): def security_groups_provider_updated(self, context):
"""Notify provider updated security groups.""" """Notify provider updated security groups."""
self.fanout_cast(context, if hasattr(self, 'client'):
self.make_msg('security_groups_provider_updated'), cctxt = self.client.prepare(version=SG_RPC_VERSION,
version=SG_RPC_VERSION, topic=self._get_security_group_topic(),
topic=self._get_security_group_topic()) fanout=True)
cctxt.cast(context, 'security_groups_member_updated')
else:
self.fanout_cast(context,
self.make_msg('security_groups_provider_updated'),
version=SG_RPC_VERSION,
topic=self._get_security_group_topic())

View File

@ -43,7 +43,6 @@ LOG = logging.getLogger(__name__)
class NECPluginApi(agent_rpc.PluginApi): class NECPluginApi(agent_rpc.PluginApi):
BASE_RPC_API_VERSION = '1.0'
def update_ports(self, context, agent_id, datapath_id, def update_ports(self, context, agent_id, datapath_id,
port_added, port_removed): port_added, port_removed):
@ -51,13 +50,12 @@ class NECPluginApi(agent_rpc.PluginApi):
LOG.info(_("Update ports: added=%(added)s, " LOG.info(_("Update ports: added=%(added)s, "
"removed=%(removed)s"), "removed=%(removed)s"),
{'added': port_added, 'removed': port_removed}) {'added': port_added, 'removed': port_removed})
self.call(context, cctxt = self.client.prepare()
self.make_msg('update_ports', return cctxt.call(context, 'update_ports',
topic=topics.AGENT, agent_id=agent_id,
agent_id=agent_id, datapath_id=datapath_id,
datapath_id=datapath_id, port_added=port_added,
port_added=port_added, port_removed=port_removed)
port_removed=port_removed))
class NECAgentRpcCallback(n_rpc.RpcCallback): class NECAgentRpcCallback(n_rpc.RpcCallback):

View File

@ -166,8 +166,8 @@ class RyuPluginApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin): sg_rpc.SecurityGroupServerRpcApiMixin):
def get_ofp_rest_api_addr(self, context): def get_ofp_rest_api_addr(self, context):
LOG.debug(_("Get Ryu rest API address")) LOG.debug(_("Get Ryu rest API address"))
return self.call(context, cctxt = self.client.prepare()
self.make_msg('get_ofp_rest_api')) return cctxt.call(context, 'get_ofp_rest_api')
class RyuSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin): class RyuSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):

View File

@ -18,6 +18,7 @@
Unit Tests for hyperv neutron rpc Unit Tests for hyperv neutron rpc
""" """
import contextlib
import mock import mock
from neutron.agent import rpc as agent_rpc from neutron.agent import rpc as agent_rpc
@ -31,8 +32,10 @@ from neutron.tests import base
class rpcHyperVApiTestCase(base.BaseTestCase): class rpcHyperVApiTestCase(base.BaseTestCase):
def _test_hyperv_neutron_api( def _test_hyperv_neutron_api_legacy(
self, rpcapi, topic, method, rpc_method, **kwargs): self, rpcapi, topic, method, rpc_method, **kwargs):
# NOTE(russellb) This version of the test method is used for interfaces
# not yet converted away from using the RpcProxy compatibility class.
ctxt = context.RequestContext('fake_user', 'fake_project') ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None) expected_version = kwargs.pop('version', None)
@ -54,9 +57,34 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
] ]
rpc_method_mock.assert_has_calls(expected) rpc_method_mock.assert_has_calls(expected)
def _test_hyperv_neutron_api(
self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
with contextlib.nested(
mock.patch.object(rpcapi.client, rpc_method),
mock.patch.object(rpcapi.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
prepare_mock.return_value = rpcapi.client
rpc_mock.return_value = expected_retval
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
prepare_args = {}
if expected_version:
prepare_args['version'] = expected_version
prepare_mock.assert_called_once_with(**prepare_args)
rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
def test_delete_network(self): def test_delete_network(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT) rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_neutron_api( self._test_hyperv_neutron_api_legacy(
rpcapi, rpcapi,
topics.get_topic_name( topics.get_topic_name(
topics.AGENT, topics.AGENT,
@ -67,7 +95,7 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
def test_port_update(self): def test_port_update(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT) rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_neutron_api( self._test_hyperv_neutron_api_legacy(
rpcapi, rpcapi,
topics.get_topic_name( topics.get_topic_name(
topics.AGENT, topics.AGENT,
@ -81,7 +109,7 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
def test_port_delete(self): def test_port_delete(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT) rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_neutron_api( self._test_hyperv_neutron_api_legacy(
rpcapi, rpcapi,
topics.get_topic_name( topics.get_topic_name(
topics.AGENT, topics.AGENT,
@ -92,7 +120,7 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
def test_tunnel_update(self): def test_tunnel_update(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT) rpcapi = ana.AgentNotifierApi(topics.AGENT)
self._test_hyperv_neutron_api( self._test_hyperv_neutron_api_legacy(
rpcapi, rpcapi,
topics.get_topic_name( topics.get_topic_name(
topics.AGENT, topics.AGENT,

View File

@ -18,6 +18,7 @@ Unit Tests for ml2 rpc
""" """
import collections import collections
import contextlib
import mock import mock
@ -165,7 +166,10 @@ class RpcCallbacksTestCase(base.BaseTestCase):
class RpcApiTestCase(base.BaseTestCase): class RpcApiTestCase(base.BaseTestCase):
def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs): def _test_rpc_api_legacy(self, rpcapi, topic, method, rpc_method,
**kwargs):
# NOTE(russellb) This can be removed once AgentNotifierApi has been
# converted over to no longer use the RpcProxy compatibility class.
ctxt = context.RequestContext('fake_user', 'fake_project') ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None) expected_version = kwargs.pop('version', None)
@ -187,35 +191,61 @@ class RpcApiTestCase(base.BaseTestCase):
] ]
rpc_method_mock.assert_has_calls(expected) rpc_method_mock.assert_has_calls(expected)
def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
with contextlib.nested(
mock.patch.object(rpcapi.client, rpc_method),
mock.patch.object(rpcapi.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
prepare_mock.return_value = rpcapi.client
rpc_mock.return_value = expected_retval
retval = getattr(rpcapi, method)(ctxt, **kwargs)
prepare_args = {}
if expected_version:
prepare_args['version'] = expected_version
prepare_mock.assert_called_once_with(**prepare_args)
self.assertEqual(retval, expected_retval)
rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
def test_delete_network(self): def test_delete_network(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
self._test_rpc_api(rpcapi, self._test_rpc_api_legacy(
topics.get_topic_name(topics.AGENT, rpcapi,
topics.NETWORK, topics.get_topic_name(topics.AGENT,
topics.DELETE), topics.NETWORK,
'network_delete', rpc_method='fanout_cast', topics.DELETE),
network_id='fake_request_spec') 'network_delete', rpc_method='fanout_cast',
network_id='fake_request_spec')
def test_port_update(self): def test_port_update(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
self._test_rpc_api(rpcapi, self._test_rpc_api_legacy(
topics.get_topic_name(topics.AGENT, rpcapi,
topics.PORT, topics.get_topic_name(topics.AGENT,
topics.UPDATE), topics.PORT,
'port_update', rpc_method='fanout_cast', topics.UPDATE),
port='fake_port', 'port_update', rpc_method='fanout_cast',
network_type='fake_network_type', port='fake_port',
segmentation_id='fake_segmentation_id', network_type='fake_network_type',
physical_network='fake_physical_network') segmentation_id='fake_segmentation_id',
physical_network='fake_physical_network')
def test_tunnel_update(self): def test_tunnel_update(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
self._test_rpc_api(rpcapi, self._test_rpc_api_legacy(
topics.get_topic_name(topics.AGENT, rpcapi,
type_tunnel.TUNNEL, topics.get_topic_name(topics.AGENT,
topics.UPDATE), type_tunnel.TUNNEL,
'tunnel_update', rpc_method='fanout_cast', topics.UPDATE),
tunnel_ip='fake_ip', tunnel_type='gre') 'tunnel_update', rpc_method='fanout_cast',
tunnel_ip='fake_ip', tunnel_type='gre')
def test_device_details(self): def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN) rpcapi = agent_rpc.PluginApi(topics.PLUGIN)

View File

@ -17,6 +17,9 @@
Unit Tests for Mellanox RPC (major reuse of linuxbridge rpc unit tests) Unit Tests for Mellanox RPC (major reuse of linuxbridge rpc unit tests)
""" """
import contextlib
import mock
import fixtures import fixtures
from oslo.config import cfg from oslo.config import cfg
@ -29,8 +32,10 @@ from neutron.tests import base
class rpcApiTestCase(base.BaseTestCase): class rpcApiTestCase(base.BaseTestCase):
def _test_mlnx_api(self, rpcapi, topic, method, rpc_method, def _test_mlnx_api_legacy(self, rpcapi, topic, method, rpc_method,
expected_msg=None, **kwargs): expected_msg=None, **kwargs):
# NOTE(russellb) This method can be removed once the AgentNotifierApi
# has been converted to no longer use the RpcProxy class.
ctxt = context.RequestContext('fake_user', 'fake_project') ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None expected_retval = 'foo' if rpc_method == 'call' else None
expected_kwargs = {} expected_kwargs = {}
@ -64,14 +69,38 @@ class rpcApiTestCase(base.BaseTestCase):
self.assertEqual(expected_arg, arg) self.assertEqual(expected_arg, arg)
self.assertEqual(expected_kwargs, self.fake_kwargs) self.assertEqual(expected_kwargs, self.fake_kwargs)
def _test_mlnx_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
with contextlib.nested(
mock.patch.object(rpcapi.client, rpc_method),
mock.patch.object(rpcapi.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
prepare_mock.return_value = rpcapi.client
rpc_mock.return_value = expected_retval
retval = getattr(rpcapi, method)(ctxt, **kwargs)
prepare_args = {}
if expected_version:
prepare_args['version'] = expected_version
prepare_mock.assert_called_once_with(**prepare_args)
self.assertEqual(retval, expected_retval)
rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
def test_delete_network(self): def test_delete_network(self):
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT) rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
self._test_mlnx_api(rpcapi, self._test_mlnx_api_legacy(
topics.get_topic_name(topics.AGENT, rpcapi,
topics.NETWORK, topics.get_topic_name(topics.AGENT,
topics.DELETE), topics.NETWORK,
'network_delete', rpc_method='fanout_cast', topics.DELETE),
network_id='fake_request_spec') 'network_delete', rpc_method='fanout_cast',
network_id='fake_request_spec')
def test_port_update(self): def test_port_update(self):
cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT') cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
@ -81,16 +110,17 @@ class rpcApiTestCase(base.BaseTestCase):
network_type='vlan', network_type='vlan',
physical_network='fake_net', physical_network='fake_net',
segmentation_id='fake_vlan_id') segmentation_id='fake_vlan_id')
self._test_mlnx_api(rpcapi, self._test_mlnx_api_legacy(
topics.get_topic_name(topics.AGENT, rpcapi,
topics.PORT, topics.get_topic_name(topics.AGENT,
topics.UPDATE), topics.PORT,
'port_update', rpc_method='fanout_cast', topics.UPDATE),
expected_msg=expected_msg, 'port_update', rpc_method='fanout_cast',
port='fake_port', expected_msg=expected_msg,
network_type='vlan', port='fake_port',
physical_network='fake_net', network_type='vlan',
vlan_id='fake_vlan_id') physical_network='fake_net',
vlan_id='fake_vlan_id')
def test_port_update_ib(self): def test_port_update_ib(self):
cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT') cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
@ -100,16 +130,17 @@ class rpcApiTestCase(base.BaseTestCase):
network_type='ib', network_type='ib',
physical_network='fake_net', physical_network='fake_net',
segmentation_id='fake_vlan_id') segmentation_id='fake_vlan_id')
self._test_mlnx_api(rpcapi, self._test_mlnx_api_legacy(
topics.get_topic_name(topics.AGENT, rpcapi,
topics.PORT, topics.get_topic_name(topics.AGENT,
topics.UPDATE), topics.PORT,
'port_update', rpc_method='fanout_cast', topics.UPDATE),
expected_msg=expected_msg, 'port_update', rpc_method='fanout_cast',
port='fake_port', expected_msg=expected_msg,
network_type='ib', port='fake_port',
physical_network='fake_net', network_type='ib',
vlan_id='fake_vlan_id') physical_network='fake_net',
vlan_id='fake_vlan_id')
def test_port_update_old_agent(self): def test_port_update_old_agent(self):
cfg.CONF.set_override('rpc_support_old_agents', True, 'AGENT') cfg.CONF.set_override('rpc_support_old_agents', True, 'AGENT')
@ -120,16 +151,17 @@ class rpcApiTestCase(base.BaseTestCase):
physical_network='fake_net', physical_network='fake_net',
segmentation_id='fake_vlan_id', segmentation_id='fake_vlan_id',
vlan_id='fake_vlan_id') vlan_id='fake_vlan_id')
self._test_mlnx_api(rpcapi, self._test_mlnx_api_legacy(
topics.get_topic_name(topics.AGENT, rpcapi,
topics.PORT, topics.get_topic_name(topics.AGENT,
topics.UPDATE), topics.PORT,
'port_update', rpc_method='fanout_cast', topics.UPDATE),
expected_msg=expected_msg, 'port_update', rpc_method='fanout_cast',
port='fake_port', expected_msg=expected_msg,
network_type='vlan', port='fake_port',
physical_network='fake_net', network_type='vlan',
vlan_id='fake_vlan_id') physical_network='fake_net',
vlan_id='fake_vlan_id')
def test_device_details(self): def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN) rpcapi = agent_rpc.PluginApi(topics.PLUGIN)

View File

@ -306,40 +306,26 @@ class TestNecAgentCallback(TestNecAgentBase):
class TestNecAgentPluginApi(TestNecAgentBase): class TestNecAgentPluginApi(TestNecAgentBase):
def _test_plugin_api(self, expected_failure=False): def test_plugin_api(self):
with contextlib.nested( with contextlib.nested(
mock.patch.object(nec_neutron_agent.NECPluginApi, 'make_msg'), mock.patch.object(self.agent.plugin_rpc.client, 'prepare'),
mock.patch.object(nec_neutron_agent.NECPluginApi, 'call'), mock.patch.object(self.agent.plugin_rpc.client, 'call'),
mock.patch.object(nec_neutron_agent, 'LOG') ) as (mock_prepare, mock_call):
) as (make_msg, apicall, log): mock_prepare.return_value = self.agent.plugin_rpc.client
agent_id = 'nec-q-agent.dummy-host' agent_id = 'nec-q-agent.dummy-host'
if expected_failure: port_added = [{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
apicall.side_effect = Exception() {'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}]
port_removed = ['id-3', 'id-4', 'id-5']
self.agent.plugin_rpc.update_ports( self.agent.plugin_rpc.update_ports(
mock.sentinel.ctx, agent_id, OVS_DPID_0X, mock.sentinel.ctx, agent_id, OVS_DPID_0X,
# port_added port_added, port_removed)
[{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
{'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
# port_removed
['id-3', 'id-4', 'id-5'])
make_msg.assert_called_once_with( mock_call.assert_called_once_with(
'update_ports', topic='q-agent-notifier', mock.sentinel.ctx, 'update_ports',
agent_id=agent_id, datapath_id=OVS_DPID_0X, agent_id=agent_id, datapath_id=OVS_DPID_0X,
port_added=[{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'}, port_added=port_added, port_removed=port_removed)
{'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
port_removed=['id-3', 'id-4', 'id-5'])
apicall.assert_called_once_with(mock.sentinel.ctx,
make_msg.return_value)
self.assertTrue(log.info.called)
if expected_failure:
self.assertTrue(log.warn.called)
def test_plugin_api(self):
self._test_plugin_api()
class TestNecAgentMain(base.BaseTestCase): class TestNecAgentMain(base.BaseTestCase):

View File

@ -226,22 +226,19 @@ class TestOVSNeutronOFPRyuAgent(RyuAgentTestCase):
class TestRyuPluginApi(RyuAgentTestCase): class TestRyuPluginApi(RyuAgentTestCase):
def test_get_ofp_rest_api_addr(self): def test_get_ofp_rest_api_addr(self):
rpcapi = self.mod_agent.RyuPluginApi('foo')
with contextlib.nested( with contextlib.nested(
mock.patch(self._AGENT_NAME + '.RyuPluginApi.make_msg', mock.patch.object(rpcapi.client, 'call'),
return_value='msg'), mock.patch.object(rpcapi.client, 'prepare'),
mock.patch(self._AGENT_NAME + '.RyuPluginApi.call', ) as (
return_value='10.0.0.1') rpc_mock, prepare_mock
) as (mock_msg, mock_call): ):
api = self.mod_agent.RyuPluginApi('topics') prepare_mock.return_value = rpcapi.client
addr = api.get_ofp_rest_api_addr('context') rpc_mock.return_value = 'return'
addr = rpcapi.get_ofp_rest_api_addr('context')
self.assertEqual(addr, '10.0.0.1') self.assertEqual('return', addr)
mock_msg.assert_has_calls([ rpc_mock.assert_called_once_with('context', 'get_ofp_rest_api')
mock.call('get_ofp_rest_api')
])
mock_call.assert_has_calls([
mock.call('context', 'msg')
])
class TestVifPortSet(RyuAgentTestCase): class TestVifPortSet(RyuAgentTestCase):

View File

@ -27,8 +27,14 @@ class AgentRPCPluginApi(base.BaseTestCase):
agent = rpc.PluginApi('fake_topic') agent = rpc.PluginApi('fake_topic')
ctxt = context.RequestContext('fake_user', 'fake_project') ctxt = context.RequestContext('fake_user', 'fake_project')
expect_val = 'foo' expect_val = 'foo'
with mock.patch('neutron.common.rpc.RpcProxy.call') as rpc_call: with contextlib.nested(
rpc_call.return_value = expect_val mock.patch.object(agent.client, 'call'),
mock.patch.object(agent.client, 'prepare'),
) as (
mock_call, mock_prepare
):
mock_prepare.return_value = agent.client
mock_call.return_value = expect_val
func_obj = getattr(agent, method) func_obj = getattr(agent, method)
if method == 'tunnel_sync': if method == 'tunnel_sync':
actual_val = func_obj(ctxt, 'fake_tunnel_ip') actual_val = func_obj(ctxt, 'fake_tunnel_ip')
@ -47,8 +53,14 @@ class AgentRPCPluginApi(base.BaseTestCase):
ctxt = context.RequestContext('fake_user', 'fake_project') ctxt = context.RequestContext('fake_user', 'fake_project')
expect_val_get_device_details = 'foo' expect_val_get_device_details = 'foo'
expect_val = [expect_val_get_device_details] expect_val = [expect_val_get_device_details]
with mock.patch('neutron.common.rpc.RpcProxy.call') as rpc_call: with contextlib.nested(
rpc_call.side_effect = [messaging.UnsupportedVersion('1.2'), mock.patch.object(agent.client, 'call'),
mock.patch.object(agent.client, 'prepare'),
) as (
mock_call, mock_prepare
):
mock_prepare.return_value = agent.client
mock_call.side_effect = [messaging.UnsupportedVersion('1.2'),
expect_val_get_device_details] expect_val_get_device_details]
func_obj = getattr(agent, 'get_devices_details_list') func_obj = getattr(agent, 'get_devices_details_list')
actual_val = func_obj(ctxt, ['fake_device'], 'fake_agent_id') actual_val = func_obj(ctxt, ['fake_device'], 'fake_agent_id')

View File

@ -1509,20 +1509,22 @@ class FakeSGRpcApi(agent_rpc.PluginApi,
class SecurityGroupServerRpcApiTestCase(base.BaseTestCase): class SecurityGroupServerRpcApiTestCase(base.BaseTestCase):
def setUp(self):
super(SecurityGroupServerRpcApiTestCase, self).setUp()
self.rpc = FakeSGRpcApi('fake_topic')
self.rpc.call = mock.Mock()
def test_security_group_rules_for_devices(self): def test_security_group_rules_for_devices(self):
self.rpc.security_group_rules_for_devices(None, ['fake_device']) rpcapi = FakeSGRpcApi('fake_topic')
self.rpc.call.assert_has_calls(
[mock.call(None, with contextlib.nested(
{'args': mock.patch.object(rpcapi.client, 'call'),
{'devices': ['fake_device']}, mock.patch.object(rpcapi.client, 'prepare'),
'method': 'security_group_rules_for_devices', ) as (
'namespace': None}, rpc_mock, prepare_mock
version='1.1')]) ):
prepare_mock.return_value = rpcapi.client
rpcapi.security_group_rules_for_devices('context', ['fake_device'])
rpc_mock.assert_called_once_with(
'context',
'security_group_rules_for_devices',
devices=['fake_device'])
class FakeSGNotifierAPI(n_rpc.RpcProxy, class FakeSGNotifierAPI(n_rpc.RpcProxy,