Merge "Removed create_rpc_dispatcher methods"

This commit is contained in:
Jenkins 2014-06-20 23:25:14 +00:00 committed by Gerrit Code Review
commit a20e6036ec
47 changed files with 110 additions and 302 deletions

View File

@ -108,15 +108,15 @@ class Service(service.Service):
LOG.debug("Creating Consumer connection for Service %s" %
self.topic)
dispatcher = [self.manager]
endpoints = [self.manager]
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
self.conn.create_consumer(self.topic, endpoints, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, dispatcher, fanout=False)
self.conn.create_consumer(node_topic, endpoints, fanout=False)
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
self.conn.create_consumer(self.topic, endpoints, fanout=True)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.

View File

@ -30,9 +30,6 @@ class MeteringRpcCallbacks(object):
def __init__(self, meter_plugin):
self.meter_plugin = meter_plugin
def create_rpc_dispatcher(self):
return [self]
def get_sync_data_metering(self, context, **kwargs):
l3_plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)

View File

@ -105,10 +105,10 @@ class RestProxyAgent(rpc_compat.RpcCallback,
self.topic = topics.AGENT
self.plugin_rpc = PluginApi(topics.PLUGIN)
self.context = q_context.get_admin_context_without_session()
self.dispatcher = [self]
self.endpoints = [self]
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)

View File

@ -119,9 +119,6 @@ class RestProxyCallbacks(rpc_compat.RpcCallback,
RPC_API_VERSION = '1.1'
def create_rpc_dispatcher(self):
return [self, agents_db.AgentExtRpcCallback()]
def get_port_from_device(self, device):
port_id = re.sub(r"^tap", "", device)
port = self.get_port_and_sgs(port_id)
@ -505,9 +502,9 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
self._dhcp_agent_notifier
)
self.callbacks = RestProxyCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
self.endpoints = [RestProxyCallbacks(),
agents_db.AgentExtRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()

View File

@ -91,14 +91,6 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback,
# 1.1 Support Security Group RPC
TAP_PREFIX_LEN = 3
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):
"""Get port from the brocade specific db."""
@ -262,10 +254,10 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.rpc_context = context.RequestContext('neutron', 'neutron',
is_admin=False)
self.conn = rpc_compat.create_connection(new=True)
self.callbacks = BridgeRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.endpoints = [BridgeRpcCallbacks(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
self.notifier = AgentNotifierApi(topics.AGENT)

View File

@ -68,14 +68,6 @@ class N1kvRpcCallbacks(rpc_compat.RpcCallback,
# Set RPC API version to 1.1 by default.
RPC_API_VERSION = '1.1'
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this rpc manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return [self, agents_db.AgentExtRpcCallback()]
class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin,
@ -135,9 +127,9 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
self.dispatcher = N1kvRpcCallbacks().create_rpc_dispatcher()
self.endpoints = [N1kvRpcCallbacks(), agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
# Consume from all consumers in threads

View File

@ -97,16 +97,13 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback,
def _setup_rpc(self):
self.topic = topics.AGENT
self.dispatcher = self._create_rpc_dispatcher()
self.endpoints = [HyperVSecurityCallbackMixin(self)]
consumers = [[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
def _create_rpc_dispatcher(self):
return [HyperVSecurityCallbackMixin(self)]
class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback,
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
@ -165,13 +162,13 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback):
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.dispatcher = self._create_rpc_dispatcher()
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.PORT, topics.DELETE],
[constants.TUNNEL, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
@ -233,9 +230,6 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback):
network_type, physical_network,
segmentation_id, port['admin_state_up'])
def _create_rpc_dispatcher(self):
return [self]
def _get_vswitch_name(self, network_type, physical_network):
if network_type != p_const.TYPE_LOCAL:
vswitch_name = self._get_vswitch_for_physical_network(

View File

@ -190,10 +190,10 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
self.conn = rpc_compat.create_connection(new=True)
self.notifier = agent_notifier_api.AgentNotifierApi(
topics.AGENT)
self.callbacks = rpc_callbacks.HyperVRpcCallbacks(self.notifier)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.endpoints = [rpc_callbacks.HyperVRpcCallbacks(self.notifier),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()

View File

@ -18,7 +18,6 @@
from neutron.common import constants as q_const
from neutron.common import rpc_compat
from neutron.db import agents_db
from neutron.db import dhcp_rpc_base
from neutron.db import l3_rpc_base
from neutron.openstack.common import log as logging
@ -41,14 +40,6 @@ class HyperVRpcCallbacks(
self.notifier = notifier
self._db = hyperv_db.HyperVPluginDB()
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return [self, agents_db.AgentExtRpcCallback()]
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')

View File

@ -123,10 +123,10 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback):
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
self.context = context.get_admin_context_without_session()
self.dispatcher = self.create_rpc_dispatcher()
self.endpoints = [self]
consumers = [[constants.INFO, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
if self.polling_interval:
@ -154,9 +154,6 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback):
"connection-mode",
"out-of-band")
def create_rpc_dispatcher(self):
return [self]
def setup_integration_br(self, bridge_name, reset_br, out_of_band,
controller_ip=None):
'''Sets up the integration bridge.

View File

@ -48,13 +48,6 @@ class SdnveRpcCallbacks():
def __init__(self, notifier):
self.notifier = notifier # used to notify the agent
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return [self, agents_db.AgentExtRpcCallback()]
def sdnve_info(self, rpc_context, **kwargs):
'''Update new information.'''
info = kwargs.get('info')
@ -140,9 +133,9 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.callbacks = SdnveRpcCallbacks(self.notifier)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
self.endpoints = [SdnveRpcCallbacks(self.notifier),
agents_db.AgentExtRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()

View File

@ -809,14 +809,6 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
getattr(self, method)(context, values)
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return [self]
class LinuxBridgePluginApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin):
@ -876,9 +868,7 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.callbacks = LinuxBridgeRpcCallbacks(self.context,
self)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self)]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
@ -886,7 +876,7 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
if cfg.CONF.VXLAN.l2_population:
consumers.append([topics.L2POPULATION,
topics.UPDATE, cfg.CONF.host])
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval

View File

@ -65,14 +65,6 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
# Device names start with "tap"
TAP_PREFIX_LEN = 3
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):
port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
@ -281,10 +273,10 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
self.callbacks = LinuxBridgeRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.endpoints = [LinuxBridgeRpcCallbacks(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
self.notifier = AgentNotifierApi(topics.AGENT)

View File

@ -180,16 +180,6 @@ class MidoRpcCallbacks(rpc_compat.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin):
RPC_API_VERSION = '1.1'
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this manager.
This a basic implementation that will call the plugin like get_ports
and handle basic events
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return [self, agents_db.AgentExtRpcCallback()]
class MidonetPluginException(n_exc.NeutronException):
message = _("%(msg)s")
@ -382,9 +372,9 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# RPC support
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
self.callbacks = MidoRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
self.endpoints = [MidoRpcCallbacks(),
agents_db.AgentExtRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()

View File

@ -25,6 +25,7 @@ from neutron.common import constants as const
from neutron.common import exceptions as exc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import db_base_plugin_v2
@ -126,11 +127,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
)
def start_rpc_listeners(self):
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
agents_db.AgentExtRpcCallback()]
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
return self.conn.consume_in_threads()

View File

@ -19,7 +19,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as q_const
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
from neutron.db import api as db_api
from neutron.db import dhcp_rpc_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
@ -58,14 +57,6 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
# test in H3.
super(RpcCallbacks, self).__init__(notifier, type_manager)
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def _device_to_port_id(cls, device):
# REVISIT(rkukura): Consider calling into MechanismDrivers to

View File

@ -210,15 +210,6 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback,
else:
LOG.debug(_("No port %s defined on agent."), port['id'])
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version,
or support more than one class as the target of rpc messages,
override this method.
"""
return [self]
class MlnxEswitchPluginApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin):
@ -268,14 +259,12 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.callbacks = MlnxEswitchRpcCallbacks(self.context,
self)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.endpoints = [MlnxEswitchRpcCallbacks(self.context, self)]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)

View File

@ -28,6 +28,7 @@ from neutron.common import exceptions as n_exc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db
@ -120,10 +121,10 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
self.callbacks = rpc_callbacks.MlnxRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)

View File

@ -18,7 +18,6 @@ from oslo.config import cfg
from neutron.common import constants as q_const
from neutron.common import rpc_compat
from neutron.db import agents_db
from neutron.db import api as db_api
from neutron.db import dhcp_rpc_base
from neutron.db import l3_rpc_base
@ -40,15 +39,6 @@ class MlnxRpcCallbacks(rpc_compat.RpcCallback,
#to be compatible with Linux Bridge Agent on Network Node
TAP_PREFIX_LEN = 3
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this manager.
If a manager would like to set an RPC API version,
or support more than one class as the target of RPC messages,
override this method.
"""
return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):
"""Get port according to device.

View File

@ -156,11 +156,11 @@ class NECNeutronAgent(object):
self, self.sg_agent)
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
self.sg_agent)
self.dispatcher = [self.callback_nec, self.callback_sg]
self.endpoints = [self.callback_nec, self.callback_sg]
# Define the listening consumer for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)

View File

@ -146,14 +146,14 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# NOTE: callback_sg is referred to from the sg unit test.
self.callback_sg = SecurityGroupServerRpcCallback()
self.dispatcher = [
self.endpoints = [
NECPluginV2RPCCallbacks(self.safe_reference),
DhcpRpcCallback(),
L3RpcCallback(),
self.callback_sg,
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
@ -715,14 +715,6 @@ class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback):
super(NECPluginV2RPCCallbacks, self).__init__()
self.plugin = plugin
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return [self]
def update_ports(self, rpc_context, **kwargs):
"""Update ports' information and activate/deavtivate them.

View File

@ -286,13 +286,13 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.dispatcher = self.create_rpc_dispatcher()
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
@ -344,14 +344,6 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
return
self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return [self]
def _provision_local_vlan_outbound_for_tunnel(self, lvid,
segmentation_id, ofports):
br = self.tun_br

View File

@ -119,11 +119,11 @@ class NVSDNeutronAgent(rpc_compat.RpcCallback):
self, self.sg_agent)
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
self.sg_agent)
self.dispatcher = [self.callback_oc, self.callback_sg]
self.endpoints = [self.callback_oc, self.callback_sg]
# Define the listening consumer for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)

View File

@ -58,10 +58,6 @@ class NVSDPluginRpcCallbacks(rpc_compat.RpcCallback,
RPC_API_VERSION = '1.1'
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this manager."""
return [self, agents_db.AgentExtRpcCallback()]
@staticmethod
def get_port_from_device(device):
port = nvsd_db.get_port_from_device(device)
@ -165,10 +161,10 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotifyAPI()
)
self.callbacks = NVSDPluginRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.endpoints = [NVSDPluginRpcCallbacks(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()

View File

@ -249,7 +249,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.dispatcher = self.create_rpc_dispatcher()
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
@ -258,7 +258,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
if self.l2_pop:
consumers.append([topics.L2POPULATION,
topics.UPDATE, cfg.CONF.host])
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
@ -493,14 +493,6 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
else:
LOG.warning(_('Action %s not supported'), action)
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return [self]
def provision_local_vlan(self, net_uuid, network_type, physical_network,
segmentation_id):
'''Provisions a local VLAN.

View File

@ -73,14 +73,6 @@ class OVSRpcCallbacks(rpc_compat.RpcCallback,
self.notifier = notifier
self.tunnel_type = tunnel_type
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):
port = ovs_db_v2.get_port_from_device(device)
@ -341,10 +333,10 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotifyAPI()
)
self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()

View File

@ -200,16 +200,13 @@ class OVSNeutronOFPRyuAgent(rpc_compat.RpcCallback,
self.topic = topics.AGENT
self.plugin_rpc = RyuPluginApi(topics.PLUGIN)
self.context = q_context.get_admin_context_without_session()
self.dispatcher = self._create_rpc_dispatcher()
self.endpoints = [self]
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
def _create_rpc_dispatcher(self):
return [self]
def _setup_integration_br(self, root_helper, integ_br,
tunnel_ip, ovsdb_port, ovsdb_ip):
self.int_br = OVSBridge(integ_br, root_helper)

View File

@ -57,9 +57,6 @@ class RyuRpcCallbacks(rpc_compat.RpcCallback,
super(RyuRpcCallbacks, self).__init__()
self.ofp_rest_api_addr = ofp_rest_api_addr
def create_rpc_dispatcher(self):
return [self]
def get_ofp_rest_api(self, context, **kwargs):
LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
return self.ofp_rest_api_addr
@ -143,10 +140,9 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.callbacks = RyuRpcCallbacks(self.ofp_api_host)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.endpoints = [RyuRpcCallbacks(self.ofp_api_host)]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
def _create_all_tenant_network(self):

View File

@ -25,7 +25,6 @@ from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions as ntn_exc
from neutron.common import rpc_compat
from neutron.db import agents_db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
from neutron.db import l3_db
@ -48,14 +47,6 @@ class NSXRpcCallbacks(rpc_compat.RpcCallback,
RPC_API_VERSION = '1.1'
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return [self, agents_db.AgentExtRpcCallback()]
def handle_network_dhcp_access(plugin, context, network, action):
pass

View File

@ -21,6 +21,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.common import constants as const
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.plugins.vmware.common import config
@ -70,8 +71,9 @@ class DhcpMetadataAccess(object):
def _setup_rpc_dhcp_metadata(self, notifier=None):
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
self.dispatcher = nsx_rpc.NSXRpcCallbacks().create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
self.endpoints = [nsx_rpc.NSXRpcCallbacks(),
agents_db.AgentExtRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
self.conn.consume_in_threads()

View File

@ -40,9 +40,6 @@ class FirewallCallbacks(rpc_compat.RpcCallback):
super(FirewallCallbacks, self).__init__()
self.plugin = plugin
def create_rpc_dispatcher(self):
return [self]
def set_firewall_status(self, context, firewall_id, status, **kwargs):
"""Agent uses this to set a firewall's status."""
LOG.debug(_("set_firewall_status() called"))
@ -165,13 +162,11 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin):
"""Do the initialization for the firewall service plugin here."""
qdbapi.register_models()
self.callbacks = FirewallCallbacks(self)
self.endpoints = [FirewallCallbacks(self)]
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
topics.FIREWALL_PLUGIN,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
topics.FIREWALL_PLUGIN, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = FirewallAgentApi(

View File

@ -39,14 +39,6 @@ class L3RouterPluginRpcCallbacks(rpc_compat.RpcCallback,
RPC_API_VERSION = '1.1'
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return [self]
class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
extraroute_db.ExtraRoute_db_mixin,
@ -76,9 +68,8 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
self.conn = rpc_compat.create_connection(new=True)
self.agent_notifiers.update(
{q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
self.callbacks = L3RouterPluginRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
self.endpoints = [L3RouterPluginRpcCallbacks()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
self.conn.consume_in_threads()

View File

@ -64,9 +64,6 @@ class LoadBalancerCallbacks(rpc_compat.RpcCallback):
super(LoadBalancerCallbacks, self).__init__()
self.plugin = plugin
def create_rpc_dispatcher(self):
return [self, agents_db.AgentExtRpcCallback(self.plugin)]
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
agents = self.plugin.get_lbaas_agents(context,
@ -342,11 +339,14 @@ class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
if hasattr(self.plugin, 'agent_callbacks'):
return
self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
self.plugin.agent_endpoints = [
LoadBalancerCallbacks(self.plugin),
agents_db.AgentExtRpcCallback(self.plugin)
]
self.plugin.conn = rpc_compat.create_connection(new=True)
self.plugin.conn.create_consumer(
topics.LOADBALANCER_PLUGIN,
self.plugin.agent_callbacks.create_rpc_dispatcher(),
self.plugin.agent_endpoints,
fanout=False)
self.plugin.conn.consume_in_threads()

View File

@ -28,13 +28,11 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
def __init__(self):
super(MeteringPlugin, self).__init__()
self.callbacks = metering_rpc.MeteringRpcCallbacks(self)
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
topics.METERING_PLUGIN,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
topics.METERING_PLUGIN, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()

View File

@ -198,10 +198,8 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
self.service_state = {}
self.conn.create_consumer(
node_topic,
self.create_rpc_dispatcher(),
fanout=False)
self.endpoints = [self]
self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = (
CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
@ -225,9 +223,6 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
v['timeout']))
for k, v in csrs_found.items()])
def create_rpc_dispatcher(self):
return [self]
def vpnservice_updated(self, context, **kwargs):
"""Handle VPNaaS service driver change notifications."""
LOG.debug(_("Handling VPN service update notification '%s'"),

View File

@ -504,10 +504,8 @@ class IPsecDriver(device_drivers.DeviceDriver):
self.processes = {}
self.process_status_cache = {}
self.conn.create_consumer(
node_topic,
self.create_rpc_dispatcher(),
fanout=False)
self.endpoints = [self]
self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
@ -515,9 +513,6 @@ class IPsecDriver(device_drivers.DeviceDriver):
self.process_status_cache_check.start(
interval=self.conf.ipsec.ipsec_status_check_interval)
def create_rpc_dispatcher(self):
return [self]
def _update_nat(self, vpnservice, func):
"""Setting up nat rule in iptables.

View File

@ -53,9 +53,6 @@ class CiscoCsrIPsecVpnDriverCallBack(rpc_compat.RpcCallback):
super(CiscoCsrIPsecVpnDriverCallBack, self).__init__()
self.driver = driver
def create_rpc_dispatcher(self):
return [self]
def get_vpn_services_on_host(self, context, host=None):
"""Retuns info on the vpnservices on the host."""
plugin = self.driver.service_plugin
@ -88,12 +85,10 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
def __init__(self, service_plugin):
super(CiscoCsrIPsecVPNDriver, self).__init__(service_plugin)
self.callbacks = CiscoCsrIPsecVpnDriverCallBack(self)
self.endpoints = [CiscoCsrIPsecVpnDriverCallBack(self)]
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
topics.CISCO_IPSEC_DRIVER_TOPIC,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
topics.CISCO_IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)

View File

@ -40,9 +40,6 @@ class IPsecVpnDriverCallBack(rpc_compat.RpcCallback):
super(IPsecVpnDriverCallBack, self).__init__()
self.driver = driver
def create_rpc_dispatcher(self):
return [self]
def get_vpn_services_on_host(self, context, host=None):
"""Returns the vpnservices on the host."""
plugin = self.driver.service_plugin
@ -73,12 +70,10 @@ class IPsecVPNDriver(service_drivers.VpnDriver):
def __init__(self, service_plugin):
super(IPsecVPNDriver, self).__init__(service_plugin)
self.callbacks = IPsecVpnDriverCallBack(self)
self.endpoints = [IPsecVpnDriverCallBack(self)]
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
topics.IPSEC_DRIVER_TOPIC,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
topics.IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = IPsecVpnAgentApi(
topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)

View File

@ -33,8 +33,6 @@ from neutron.db import portbindings_db # noqa
RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
DISPATCHER = CALLBACKS + '.create_rpc_dispatcher'
CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager'
HTTPCON = 'neutron.plugins.bigswitch.servermanager.httplib.HTTPConnection'
@ -61,15 +59,11 @@ class BigSwitchTestBase(object):
def setup_patches(self):
self.plugin_notifier_p = mock.patch(NOTIFIER)
# prevent rpc callback dispatcher from being created
self.callbacks_p = mock.patch(DISPATCHER,
new=lambda *args, **kwargs: None)
# prevent any greenthreads from spawning
self.spawn_p = mock.patch(SPAWN, new=lambda *args, **kwargs: None)
# prevent the consistency watchdog from starting
self.watch_p = mock.patch(CWATCH, new=lambda *args, **kwargs: None)
self.addCleanup(db.clear_db)
self.callbacks_p.start()
self.plugin_notifier_p.start()
self.spawn_p.start()
self.watch_p.start()

View File

@ -32,7 +32,7 @@ class RestProxySecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase,
super(RestProxySecurityGroupsTestCase, self).setUp(self.plugin_str)
plugin = manager.NeutronManager.get_plugin()
self.notifier = plugin.notifier
self.rpc = plugin.callbacks
self.rpc = plugin.endpoints[0]
self.startHttpPatch()

View File

@ -62,7 +62,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
bound, status)
port_id = port['port']['id']
neutron_context = context.get_admin_context()
details = self.plugin.callbacks.get_device_details(
details = self.plugin.endpoints[0].get_device_details(
neutron_context, agent_id="theAgentId", device=port_id)
if bound:
self.assertEqual(details['network_type'], 'local')

View File

@ -74,7 +74,8 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device(port_id)
callbacks = plugin.endpoints[0]
port_dict = callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
@ -85,7 +86,7 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)

View File

@ -136,7 +136,8 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device(port_id)
callbacks = plugin.endpoints[0]
port_dict = callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
@ -148,7 +149,7 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)

View File

@ -84,7 +84,8 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase,
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device(port_id)
callbacks = plugin.endpoints[0]
port_dict = callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
@ -95,7 +96,7 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase,
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)

View File

@ -73,7 +73,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device(port_id)
port_dict = plugin.endpoints[0].get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
@ -84,7 +84,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)

View File

@ -41,7 +41,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
def setUp(self):
super(TestFirewallCallbacks,
self).setUp(fw_plugin=FW_PLUGIN_KLASS)
self.callbacks = self.plugin.callbacks
self.callbacks = self.plugin.endpoints[0]
def test_set_firewall_status(self):
ctx = context.get_admin_context()
@ -210,7 +210,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
def setUp(self):
super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS)
self.callbacks = self.plugin.callbacks
self.callbacks = self.plugin.endpoints[0]
def test_create_second_firewall_not_permitted(self):
with self.firewall():
@ -342,7 +342,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
for k, v in attrs.iteritems():
self.assertEqual(fw_db[k], v)
# cleanup the pending firewall
self.plugin.callbacks.firewall_deleted(ctx, fw_id)
self.plugin.endpoints[0].firewall_deleted(ctx, fw_id)
def test_delete_firewall_after_agent_delete(self):
ctx = context.get_admin_context()

View File

@ -83,31 +83,31 @@ class AgentPluginReportState(base.BaseTestCase):
class AgentRPCMethods(base.BaseTestCase):
def test_create_consumers(self):
dispatcher = mock.Mock()
endpoints = [mock.Mock()]
expected = [
mock.call(new=True),
mock.call().create_consumer('foo-topic-op', dispatcher,
mock.call().create_consumer('foo-topic-op', endpoints,
fanout=True),
mock.call().consume_in_threads()
]
call_to_patch = 'neutron.common.rpc_compat.create_connection'
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)
def test_create_consumers_with_node_name(self):
dispatcher = mock.Mock()
endpoints = [mock.Mock()]
expected = [
mock.call(new=True),
mock.call().create_consumer('foo-topic-op', dispatcher,
mock.call().create_consumer('foo-topic-op', endpoints,
fanout=True),
mock.call().create_consumer('foo-topic-op.node1', dispatcher,
mock.call().create_consumer('foo-topic-op.node1', endpoints,
fanout=False),
mock.call().consume_in_threads()
]
call_to_patch = 'neutron.common.rpc_compat.create_connection'
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(dispatcher, 'foo', [('topic', 'op', 'node1')])
rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
create_connection.assert_has_calls(expected)