Don't instantiate RPC clients on import
In oslo.messaging port, we'll need to make sure no RPC clients or servers or notifiers are created before RPC layer is initialized using n_rpc.init(). This means that there should be no global objects that create those objects on __init__. There should also be no such class attributes because in that case import will itself instantiate the object, probably before RPC layer is ready. blueprint oslo-messaging Change-Id: Ia8a9fd39777c75e4253f5518c2de6be551cc365b
This commit is contained in:
parent
d8cf7fd7c5
commit
3e6393de51
@ -119,5 +119,3 @@ class L3AgentNotifyAPI(rpc_compat.RpcProxy):
|
|||||||
def router_added_to_agent(self, context, router_ids, host):
|
def router_added_to_agent(self, context, router_ids, host):
|
||||||
self._notification_host(context, 'router_added_to_agent',
|
self._notification_host(context, 'router_added_to_agent',
|
||||||
router_ids, host)
|
router_ids, host)
|
||||||
|
|
||||||
L3AgentNotify = L3AgentNotifyAPI()
|
|
||||||
|
@ -81,13 +81,22 @@ class FloatingIP(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
|
|||||||
class L3_NAT_db_mixin(l3.RouterPluginBase):
|
class L3_NAT_db_mixin(l3.RouterPluginBase):
|
||||||
"""Mixin class to add L3/NAT router methods to db_base_plugin_v2."""
|
"""Mixin class to add L3/NAT router methods to db_base_plugin_v2."""
|
||||||
|
|
||||||
l3_rpc_notifier = l3_rpc_agent_api.L3AgentNotify
|
|
||||||
router_device_owners = (
|
router_device_owners = (
|
||||||
DEVICE_OWNER_ROUTER_INTF,
|
DEVICE_OWNER_ROUTER_INTF,
|
||||||
DEVICE_OWNER_ROUTER_GW,
|
DEVICE_OWNER_ROUTER_GW,
|
||||||
DEVICE_OWNER_FLOATINGIP
|
DEVICE_OWNER_FLOATINGIP
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def l3_rpc_notifier(self):
|
||||||
|
if not hasattr(self, '_l3_rpc_notifier'):
|
||||||
|
self._l3_rpc_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||||
|
return self._l3_rpc_notifier
|
||||||
|
|
||||||
|
@l3_rpc_notifier.setter
|
||||||
|
def l3_rpc_notifier(self, value):
|
||||||
|
self._l3_rpc_notifier = value
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def _core_plugin(self):
|
def _core_plugin(self):
|
||||||
return manager.NeutronManager.get_plugin()
|
return manager.NeutronManager.get_plugin()
|
||||||
|
@ -275,7 +275,7 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||||
)
|
)
|
||||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||||
l3_rpc_agent_api.L3AgentNotify
|
l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||||
)
|
)
|
||||||
|
|
||||||
def create_network(self, context, network):
|
def create_network(self, context, network):
|
||||||
|
@ -141,7 +141,7 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
for svc_topic in self.service_topics.values():
|
for svc_topic in self.service_topics.values():
|
||||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||||
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
|
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||||
# Consume from all consumers in a thread
|
# Consume from all consumers in a thread
|
||||||
self.conn.consume_in_thread()
|
self.conn.consume_in_thread()
|
||||||
|
|
||||||
|
@ -294,7 +294,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||||
)
|
)
|
||||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||||
l3_rpc_agent_api.L3AgentNotify
|
l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||||
)
|
)
|
||||||
|
|
||||||
def _parse_network_vlan_ranges(self):
|
def _parse_network_vlan_ranges(self):
|
||||||
|
@ -34,6 +34,10 @@ LOG = logging.getLogger(__name__)
|
|||||||
class L2populationMechanismDriver(api.MechanismDriver,
|
class L2populationMechanismDriver(api.MechanismDriver,
|
||||||
l2pop_db.L2populationDbMixin):
|
l2pop_db.L2populationDbMixin):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super(L2populationMechanismDriver, self).__init__()
|
||||||
|
self.L2populationAgentNotify = l2pop_rpc.L2populationAgentNotifyAPI()
|
||||||
|
|
||||||
def initialize(self):
|
def initialize(self):
|
||||||
LOG.debug(_("Experimental L2 population driver"))
|
LOG.debug(_("Experimental L2 population driver"))
|
||||||
self.rpc_ctx = n_context.get_admin_context_without_session()
|
self.rpc_ctx = n_context.get_admin_context_without_session()
|
||||||
@ -56,7 +60,7 @@ class L2populationMechanismDriver(api.MechanismDriver,
|
|||||||
def delete_port_postcommit(self, context):
|
def delete_port_postcommit(self, context):
|
||||||
fanout_msg = self.deleted_ports.pop(context.current['id'], None)
|
fanout_msg = self.deleted_ports.pop(context.current['id'], None)
|
||||||
if fanout_msg:
|
if fanout_msg:
|
||||||
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
|
self.L2populationAgentNotify.remove_fdb_entries(
|
||||||
self.rpc_ctx, fanout_msg)
|
self.rpc_ctx, fanout_msg)
|
||||||
|
|
||||||
def _get_diff_ips(self, orig, port):
|
def _get_diff_ips(self, orig, port):
|
||||||
@ -90,7 +94,7 @@ class L2populationMechanismDriver(api.MechanismDriver,
|
|||||||
if port_mac_ip:
|
if port_mac_ip:
|
||||||
ports['after'] = port_mac_ip
|
ports['after'] = port_mac_ip
|
||||||
|
|
||||||
l2pop_rpc.L2populationAgentNotify.update_fdb_entries(
|
self.L2populationAgentNotify.update_fdb_entries(
|
||||||
self.rpc_ctx, {'chg_ip': upd_fdb_entries})
|
self.rpc_ctx, {'chg_ip': upd_fdb_entries})
|
||||||
|
|
||||||
return True
|
return True
|
||||||
@ -114,14 +118,14 @@ class L2populationMechanismDriver(api.MechanismDriver,
|
|||||||
self._update_port_up(context)
|
self._update_port_up(context)
|
||||||
elif port['status'] == const.PORT_STATUS_DOWN:
|
elif port['status'] == const.PORT_STATUS_DOWN:
|
||||||
fdb_entries = self._update_port_down(context, port)
|
fdb_entries = self._update_port_down(context, port)
|
||||||
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
|
self.L2populationAgentNotify.remove_fdb_entries(
|
||||||
self.rpc_ctx, fdb_entries)
|
self.rpc_ctx, fdb_entries)
|
||||||
elif port['status'] == const.PORT_STATUS_BUILD:
|
elif port['status'] == const.PORT_STATUS_BUILD:
|
||||||
orig = self.migrated_ports.pop(port['id'], None)
|
orig = self.migrated_ports.pop(port['id'], None)
|
||||||
if orig:
|
if orig:
|
||||||
# this port has been migrated : remove its entries from fdb
|
# this port has been migrated : remove its entries from fdb
|
||||||
fdb_entries = self._update_port_down(context, orig)
|
fdb_entries = self._update_port_down(context, orig)
|
||||||
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
|
self.L2populationAgentNotify.remove_fdb_entries(
|
||||||
self.rpc_ctx, fdb_entries)
|
self.rpc_ctx, fdb_entries)
|
||||||
|
|
||||||
def _get_port_infos(self, context, port):
|
def _get_port_infos(self, context, port):
|
||||||
@ -206,14 +210,14 @@ class L2populationMechanismDriver(api.MechanismDriver,
|
|||||||
const.FLOODING_ENTRY)
|
const.FLOODING_ENTRY)
|
||||||
|
|
||||||
if ports.keys():
|
if ports.keys():
|
||||||
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(
|
self.L2populationAgentNotify.add_fdb_entries(
|
||||||
self.rpc_ctx, agent_fdb_entries, agent_host)
|
self.rpc_ctx, agent_fdb_entries, agent_host)
|
||||||
|
|
||||||
# Notify other agents to add fdb rule for current port
|
# Notify other agents to add fdb rule for current port
|
||||||
other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries
|
other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries
|
||||||
|
|
||||||
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
|
self.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
|
||||||
other_fdb_entries)
|
other_fdb_entries)
|
||||||
|
|
||||||
def _update_port_down(self, context, port_context,
|
def _update_port_down(self, context, port_context,
|
||||||
agent_active_ports_count_for_flooding=0):
|
agent_active_ports_count_for_flooding=0):
|
||||||
|
@ -84,5 +84,3 @@ class L2populationAgentNotifyAPI(rpc_compat.RpcProxy):
|
|||||||
else:
|
else:
|
||||||
self._notification_fanout(context, 'update_fdb_entries',
|
self._notification_fanout(context, 'update_fdb_entries',
|
||||||
fdb_entries)
|
fdb_entries)
|
||||||
|
|
||||||
L2populationAgentNotify = L2populationAgentNotifyAPI()
|
|
||||||
|
@ -131,7 +131,7 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||||
)
|
)
|
||||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||||
l3_rpc_agent_api.L3AgentNotify
|
l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||||
)
|
)
|
||||||
|
|
||||||
def _parse_network_config(self):
|
def _parse_network_config(self):
|
||||||
|
@ -165,7 +165,7 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||||
)
|
)
|
||||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||||
l3_rpc_agent_api.L3AgentNotify
|
l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||||
)
|
)
|
||||||
self.callbacks = NVSDPluginRpcCallbacks()
|
self.callbacks = NVSDPluginRpcCallbacks()
|
||||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||||
|
@ -341,7 +341,7 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||||
)
|
)
|
||||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||||
l3_rpc_agent_api.L3AgentNotify
|
l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||||
)
|
)
|
||||||
self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
|
self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
|
||||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||||
|
@ -76,7 +76,7 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
|
|||||||
self.topic = topics.L3PLUGIN
|
self.topic = topics.L3PLUGIN
|
||||||
self.conn = rpc_compat.create_connection(new=True)
|
self.conn = rpc_compat.create_connection(new=True)
|
||||||
self.agent_notifiers.update(
|
self.agent_notifiers.update(
|
||||||
{q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotify})
|
{q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
|
||||||
self.callbacks = L3RouterPluginRpcCallbacks()
|
self.callbacks = L3RouterPluginRpcCallbacks()
|
||||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||||
|
Loading…
Reference in New Issue
Block a user