Make L3RpcCallback a separate callback class
RPC has a version of itself. In Neutron a plugin implements several RPC interface, so a single RPC version doesn't work. In Mixin callback class approach, RPC versioning depends on each plugin implementation and it makes harder to maintain RPC version appropriately. This patch series replaces mixin RPC callback of server side with a separate class. This commit handles server-side callback of L3-agent RPC interface. L3-agent server-side callback class is moved from db/ to api/rpc/handlers because it doesn't involve any db operations and defining all RPC interfaces in a single place sounds reasonable. Note that moving other L3-agent related RPC interface class to api/rpc/handlers will be done in a separate patch as this patch focuses on reorganizing the server-side RPC callback class. Partial-Bug: #1359416 Change-Id: Ie3f2c9b2ad907a1110e05fe94d42e41e93fbcaa7
This commit is contained in:
parent
afd098ec2b
commit
0684f0c22c
@ -17,6 +17,7 @@ from oslo.config import cfg
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import utils
|
||||
from neutron import context as neutron_context
|
||||
from neutron.extensions import l3
|
||||
@ -30,8 +31,14 @@ from neutron.plugins.common import constants as plugin_constants
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class L3RpcCallbackMixin(object):
|
||||
"""A mix-in that enable L3 agent rpc support in plugin implementations."""
|
||||
class L3RpcCallback(n_rpc.RpcCallback):
|
||||
"""L3 agent RPC callback in plugin implementations."""
|
||||
|
||||
# 1.0 L3PluginApi BASE_RPC_API_VERSION
|
||||
# 1.1 Support update_floatingip_statuses
|
||||
# 1.2 Added methods for DVR support
|
||||
# 1.3 Added a method that returns the list of activated services
|
||||
RPC_API_VERSION = '1.3'
|
||||
|
||||
@property
|
||||
def plugin(self):
|
@ -28,6 +28,7 @@ from oslo.config import cfg
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
@ -40,7 +41,6 @@ from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import external_net_db
|
||||
from neutron.db import extraroute_db
|
||||
from neutron.db import l3_agentschedulers_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.db import portbindings_base
|
||||
from neutron.db import securitygroups_rpc_base as sg_db_rpc
|
||||
from neutron.extensions import portbindings
|
||||
@ -79,7 +79,6 @@ cfg.CONF.register_opts(PHYSICAL_INTERFACE_OPTS, "PHYSICAL_INTERFACE")
|
||||
|
||||
class BridgeRpcCallbacks(n_rpc.RpcCallback,
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
l3_rpc_base.L3RpcCallbackMixin,
|
||||
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
|
||||
"""Agent callback."""
|
||||
|
||||
@ -264,6 +263,7 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
is_admin=False)
|
||||
self.conn = n_rpc.create_connection(new=True)
|
||||
self.endpoints = [BridgeRpcCallbacks(),
|
||||
l3_rpc.L3RpcCallback(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
|
@ -23,6 +23,7 @@ from oslo.config import cfg as q_conf
|
||||
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions as n_exc
|
||||
@ -36,7 +37,6 @@ from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import external_net_db
|
||||
from neutron.db import extraroute_db
|
||||
from neutron.db import l3_agentschedulers_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.db import portbindings_db
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.extensions import providernet
|
||||
@ -59,8 +59,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class N1kvRpcCallbacks(n_rpc.RpcCallback,
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
l3_rpc_base.L3RpcCallbackMixin):
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin):
|
||||
|
||||
"""Class to handle agent RPC calls."""
|
||||
|
||||
@ -128,7 +127,9 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
|
||||
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
|
||||
self.conn = n_rpc.create_connection(new=True)
|
||||
self.endpoints = [N1kvRpcCallbacks(), agents_db.AgentExtRpcCallback()]
|
||||
self.endpoints = [N1kvRpcCallbacks(),
|
||||
l3_rpc.L3RpcCallback(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron.common import rpc as n_rpc
|
||||
@ -188,6 +189,7 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
|
||||
self.notifier = agent_notifier_api.AgentNotifierApi(
|
||||
topics.AGENT)
|
||||
self.endpoints = [rpc_callbacks.HyperVRpcCallbacks(self.notifier),
|
||||
l3_rpc.L3RpcCallback(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
|
@ -17,7 +17,6 @@
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.plugins.hyperv import db as hyperv_db
|
||||
|
||||
@ -27,8 +26,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class HyperVRpcCallbacks(
|
||||
n_rpc.RpcCallback,
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
l3_rpc_base.L3RpcCallbackMixin):
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin):
|
||||
|
||||
# history
|
||||
# 1.1 Support Security Group RPC
|
||||
|
@ -20,6 +20,7 @@ from oslo.config import cfg
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions as n_exc
|
||||
@ -35,7 +36,6 @@ from neutron.db import external_net_db
|
||||
from neutron.db import extraroute_db
|
||||
from neutron.db import l3_agentschedulers_db
|
||||
from neutron.db import l3_gwmode_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.db import portbindings_db
|
||||
from neutron.db import quota_db # noqa
|
||||
from neutron.db import securitygroups_rpc_base as sg_db_rpc
|
||||
@ -55,7 +55,6 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
l3_rpc_base.L3RpcCallbackMixin,
|
||||
sg_db_rpc.SecurityGroupServerRpcCallbackMixin
|
||||
):
|
||||
|
||||
@ -285,6 +284,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
|
||||
self.conn = n_rpc.create_connection(new=True)
|
||||
self.endpoints = [LinuxBridgeRpcCallbacks(),
|
||||
l3_rpc.L3RpcCallback(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
|
@ -20,6 +20,7 @@ from oslo.config import cfg
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions as n_exc
|
||||
@ -120,6 +121,7 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
|
||||
self.conn = n_rpc.create_connection(new=True)
|
||||
self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(),
|
||||
l3_rpc.L3RpcCallback(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
|
@ -18,7 +18,6 @@ from neutron.common import constants as q_const
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.db import securitygroups_rpc_base as sg_db_rpc
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.plugins.mlnx.db import mlnx_db_v2 as db
|
||||
@ -28,7 +27,6 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class MlnxRpcCallbacks(n_rpc.RpcCallback,
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
l3_rpc_base.L3RpcCallbackMixin,
|
||||
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
|
||||
# History
|
||||
# 1.1 Support Security Group RPC
|
||||
|
@ -17,6 +17,7 @@
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api import extensions as neutron_extensions
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.api.v2 import attributes as attrs
|
||||
from neutron.common import constants as const
|
||||
from neutron.common import exceptions as n_exc
|
||||
@ -28,7 +29,6 @@ from neutron.db import allowedaddresspairs_db as addr_pair_db
|
||||
from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import external_net_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.db import portbindings_base
|
||||
from neutron.db import portbindings_db
|
||||
from neutron.db import quota_db # noqa
|
||||
@ -147,7 +147,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.endpoints = [
|
||||
NECPluginV2RPCCallbacks(self.safe_reference),
|
||||
DhcpRpcCallback(),
|
||||
L3RpcCallback(),
|
||||
l3_rpc.L3RpcCallback(),
|
||||
self.callback_sg,
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
@ -686,12 +686,6 @@ class DhcpRpcCallback(n_rpc.RpcCallback,
|
||||
RPC_API_VERSION = '1.1'
|
||||
|
||||
|
||||
class L3RpcCallback(n_rpc.RpcCallback, l3_rpc_base.L3RpcCallbackMixin):
|
||||
# 1.0 L3PluginApi BASE_RPC_API_VERSION
|
||||
# 1.1 Support update_floatingip_statuses
|
||||
RPC_API_VERSION = '1.1'
|
||||
|
||||
|
||||
class SecurityGroupServerRpcCallback(
|
||||
n_rpc.RpcCallback,
|
||||
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
|
||||
|
@ -21,6 +21,7 @@ from oslo.config import cfg
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions as nexception
|
||||
from neutron.common import rpc as n_rpc
|
||||
@ -33,7 +34,6 @@ from neutron.db import external_net_db
|
||||
from neutron.db import extraroute_db
|
||||
from neutron.db import l3_agentschedulers_db
|
||||
from neutron.db import l3_gwmode_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.db import portbindings_base
|
||||
from neutron.db import quota_db # noqa
|
||||
from neutron.db import securitygroups_rpc_base as sg_db_rpc
|
||||
@ -53,7 +53,6 @@ IPv6 = 6
|
||||
|
||||
class NVSDPluginRpcCallbacks(n_rpc.RpcCallback,
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
l3_rpc_base.L3RpcCallbackMixin,
|
||||
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
|
||||
|
||||
RPC_API_VERSION = '1.1'
|
||||
@ -162,6 +161,7 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||
)
|
||||
self.endpoints = [NVSDPluginRpcCallbacks(),
|
||||
l3_rpc.L3RpcCallback(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
|
@ -20,6 +20,7 @@ from oslo.config import cfg
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions as n_exc
|
||||
@ -36,7 +37,6 @@ from neutron.db import extradhcpopt_db
|
||||
from neutron.db import extraroute_db
|
||||
from neutron.db import l3_agentschedulers_db
|
||||
from neutron.db import l3_gwmode_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.db import portbindings_db
|
||||
from neutron.db import quota_db # noqa
|
||||
from neutron.db import securitygroups_rpc_base as sg_db_rpc
|
||||
@ -60,7 +60,6 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class OVSRpcCallbacks(n_rpc.RpcCallback,
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
l3_rpc_base.L3RpcCallbackMixin,
|
||||
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
|
||||
|
||||
# history
|
||||
@ -346,6 +345,7 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||
)
|
||||
self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type),
|
||||
l3_rpc.L3RpcCallback(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
|
@ -20,6 +20,7 @@ from ryu.app import client
|
||||
from ryu.app import rest_nw_id
|
||||
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron.common import rpc as n_rpc
|
||||
@ -30,7 +31,6 @@ from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import external_net_db
|
||||
from neutron.db import extraroute_db
|
||||
from neutron.db import l3_gwmode_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.db import models_v2
|
||||
from neutron.db import portbindings_base
|
||||
from neutron.db import securitygroups_rpc_base as sg_db_rpc
|
||||
@ -47,7 +47,6 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class RyuRpcCallbacks(n_rpc.RpcCallback,
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
l3_rpc_base.L3RpcCallbackMixin,
|
||||
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
|
||||
|
||||
RPC_API_VERSION = '1.1'
|
||||
@ -139,7 +138,8 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
|
||||
self.conn = n_rpc.create_connection(new=True)
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
self.endpoints = [RyuRpcCallbacks(self.ofp_api_host)]
|
||||
self.endpoints = [RyuRpcCallbacks(self.ofp_api_host),
|
||||
l3_rpc.L3RpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
|
@ -21,6 +21,7 @@ import threading
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import log
|
||||
from neutron.common import rpc as q_rpc
|
||||
@ -30,7 +31,6 @@ from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import extraroute_db
|
||||
from neutron.db import l3_agentschedulers_db
|
||||
from neutron.db import l3_gwmode_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.openstack.common import excutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.plugins.common import constants
|
||||
@ -41,12 +41,6 @@ from neutron.plugins.ml2.drivers.arista.arista_l3_driver import NeutronNets # n
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AristaL3ServicePluginRpcCallbacks(q_rpc.RpcCallback,
|
||||
l3_rpc_base.L3RpcCallbackMixin):
|
||||
|
||||
RPC_API_VERSION = '1.2'
|
||||
|
||||
|
||||
class AristaL3ServicePlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
extraroute_db.ExtraRoute_db_mixin,
|
||||
l3_gwmode_db.L3_NAT_db_mixin,
|
||||
@ -76,7 +70,7 @@ class AristaL3ServicePlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.conn = q_rpc.create_connection(new=True)
|
||||
self.agent_notifiers.update(
|
||||
{q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
|
||||
self.endpoints = [AristaL3ServicePluginRpcCallbacks()]
|
||||
self.endpoints = [l3_rpc.L3RpcCallback()]
|
||||
self.conn.create_consumer(self.topic, self.endpoints,
|
||||
fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
|
@ -18,6 +18,7 @@
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
@ -26,20 +27,10 @@ from neutron.db import extraroute_db
|
||||
from neutron.db import l3_dvr_db
|
||||
from neutron.db import l3_dvrscheduler_db
|
||||
from neutron.db import l3_gwmode_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.plugins.common import constants
|
||||
|
||||
|
||||
class L3RouterPluginRpcCallbacks(n_rpc.RpcCallback,
|
||||
l3_rpc_base.L3RpcCallbackMixin):
|
||||
|
||||
RPC_API_VERSION = '1.3'
|
||||
# history
|
||||
# 1.2 Added methods for DVR support
|
||||
# 1.3 Added a method that returns the list of activated services
|
||||
|
||||
|
||||
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
|
||||
extraroute_db.ExtraRoute_db_mixin,
|
||||
l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
||||
@ -70,7 +61,7 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin,
|
||||
self.conn = n_rpc.create_connection(new=True)
|
||||
self.agent_notifiers.update(
|
||||
{q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
|
||||
self.endpoints = [L3RouterPluginRpcCallbacks()]
|
||||
self.endpoints = [l3_rpc.L3RpcCallback()]
|
||||
self.conn.create_consumer(self.topic, self.endpoints,
|
||||
fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
|
@ -15,8 +15,8 @@
|
||||
|
||||
import contextlib
|
||||
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.common import constants
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.tests.unit.nec import test_nec_plugin
|
||||
from neutron.tests.unit.openvswitch import test_agent_scheduler
|
||||
|
||||
@ -75,10 +75,10 @@ class NecL3AgentSchedulerWithOpenFlowRouter(
|
||||
self.router(arg_list=('provider',),
|
||||
provider='openflow'
|
||||
)) as (r1, r2):
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
ret_b = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
ret_b = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
l3_agents = self._list_l3_agents_hosting_router(
|
||||
r1['router']['id'])
|
||||
self.assertEqual(1, len(ret_a))
|
||||
@ -93,9 +93,9 @@ class NecL3AgentSchedulerWithOpenFlowRouter(
|
||||
self.router(arg_list=('provider',), provider='openflow'),
|
||||
self.router(arg_list=('provider',), provider='openflow')
|
||||
) as (r1, r2):
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
l3_agents_1 = self._list_l3_agents_hosting_router(
|
||||
r1['router']['id'])
|
||||
l3_agents_2 = self._list_l3_agents_hosting_router(
|
||||
|
@ -23,13 +23,13 @@ from webob import exc
|
||||
|
||||
from neutron.api import extensions
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants
|
||||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import l3_agentschedulers_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.extensions import agent
|
||||
from neutron.extensions import dhcpagentscheduler
|
||||
from neutron.extensions import l3agentscheduler
|
||||
@ -645,11 +645,11 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
|
||||
def test_router_is_not_rescheduled_from_alive_agent(self):
|
||||
with self.router():
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
|
||||
# schedule the router to host A
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
with mock.patch('neutron.db.l3_agentschedulers_db.'
|
||||
'L3AgentSchedulerDbMixin.reschedule_router') as rr:
|
||||
# take down some unrelated agent and run reschedule check
|
||||
@ -658,24 +658,24 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
|
||||
def test_router_reschedule_from_dead_agent(self):
|
||||
with self.router():
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
|
||||
# schedule the router to host A
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
self._take_down_agent_and_run_reschedule(L3_HOSTA)
|
||||
|
||||
# B should now pick up the router
|
||||
ret_b = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
ret_b = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
self.assertEqual(ret_b, ret_a)
|
||||
|
||||
def test_router_no_reschedule_from_dead_admin_down_agent(self):
|
||||
with self.router() as r:
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
|
||||
# schedule the router to host A
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
self._set_agent_admin_state_up(L3_HOSTA, False)
|
||||
self._take_down_agent_and_run_reschedule(L3_HOSTA)
|
||||
|
||||
@ -687,28 +687,28 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
self.assertEqual(binding.l3_agent.host, L3_HOSTA)
|
||||
|
||||
# B should not pick up the router
|
||||
ret_b = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
ret_b = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
self.assertFalse(ret_b)
|
||||
|
||||
def test_router_auto_schedule_with_invalid_router(self):
|
||||
with self.router() as router:
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
# deleted router
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=[router['router']['id']])
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=[router['router']['id']])
|
||||
self.assertFalse(ret_a)
|
||||
# non-existent router
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=[uuidutils.generate_uuid()])
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=[uuidutils.generate_uuid()])
|
||||
self.assertFalse(ret_a)
|
||||
|
||||
def test_router_auto_schedule_with_hosted(self):
|
||||
with self.router() as router:
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
ret_b = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
ret_b = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
l3_agents = self._list_l3_agents_hosting_router(
|
||||
router['router']['id'])
|
||||
self.assertEqual(1, len(ret_a))
|
||||
@ -719,14 +719,14 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
|
||||
def test_router_auto_schedule_restart_l3_agent(self):
|
||||
with self.router():
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
|
||||
def test_router_auto_schedule_with_hosted_2(self):
|
||||
# one agent hosts one router
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
l3_hosta = {
|
||||
'binary': 'neutron-l3-agent',
|
||||
'host': L3_HOSTA,
|
||||
@ -744,13 +744,13 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
l3_hostb['host'] = L3_HOSTB
|
||||
with self.router() as router1:
|
||||
self._register_one_agent_state(l3_hosta)
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
self._disable_agent(hosta_id, admin_state_up=False)
|
||||
with self.router() as router2:
|
||||
self._register_one_agent_state(l3_hostb)
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
l3_agents_1 = self._list_l3_agents_hosting_router(
|
||||
router1['router']['id'])
|
||||
l3_agents_2 = self._list_l3_agents_hosting_router(
|
||||
@ -773,7 +773,7 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
def test_router_auto_schedule_with_disabled(self):
|
||||
with contextlib.nested(self.router(),
|
||||
self.router()):
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
@ -781,9 +781,9 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
L3_HOSTB)
|
||||
self._disable_agent(hosta_id)
|
||||
# first agent will not host router since it is disabled
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
# second agent will host all the routers since first is disabled.
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
|
||||
hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id)
|
||||
num_hostb_routers = len(hostb_routers['routers'])
|
||||
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
|
||||
@ -807,12 +807,12 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
'agent_type': constants.AGENT_TYPE_L3}
|
||||
with contextlib.nested(self.router(),
|
||||
self.router()) as (router1, router2):
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
l3_hosta['configurations']['router_id'] = router1['router']['id']
|
||||
self._register_one_agent_state(l3_hosta)
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
|
||||
num_hosta_routers = len(hosta_routers['routers'])
|
||||
l3_agents_1 = self._list_l3_agents_hosting_router(
|
||||
@ -825,11 +825,11 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
self.assertEqual(0, len(l3_agents_2['agents']))
|
||||
|
||||
def test_rpc_sync_routers(self):
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
|
||||
# No routers
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
self.assertEqual(0, len(ret_a))
|
||||
|
||||
with contextlib.nested(self.router(),
|
||||
@ -838,26 +838,26 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
router_ids = [r['router']['id'] for r in routers]
|
||||
|
||||
# Get all routers
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
self.assertEqual(3, len(ret_a))
|
||||
self.assertEqual(set(router_ids), set([r['id'] for r in ret_a]))
|
||||
|
||||
# Get all routers (router_ids=None)
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=None)
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=None)
|
||||
self.assertEqual(3, len(ret_a))
|
||||
self.assertEqual(set(router_ids), set([r['id'] for r in ret_a]))
|
||||
|
||||
# Get router2 only
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=[router_ids[1]])
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=[router_ids[1]])
|
||||
self.assertEqual(1, len(ret_a))
|
||||
self.assertIn(router_ids[1], [r['id'] for r in ret_a])
|
||||
|
||||
# Get router1 and router3
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=[router_ids[0],
|
||||
router_ids[2]])
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=[router_ids[0],
|
||||
router_ids[2]])
|
||||
self.assertEqual(2, len(ret_a))
|
||||
self.assertIn(router_ids[0], [r['id'] for r in ret_a])
|
||||
self.assertIn(router_ids[2], [r['id'] for r in ret_a])
|
||||
@ -865,8 +865,8 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
def test_router_auto_schedule_for_specified_routers(self):
|
||||
|
||||
def _sync_router_with_ids(router_ids, exp_synced, exp_hosted, host_id):
|
||||
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=router_ids)
|
||||
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
|
||||
router_ids=router_ids)
|
||||
self.assertEqual(exp_synced, len(ret_a))
|
||||
for r in router_ids:
|
||||
self.assertIn(r, [r['id'] for r in ret_a])
|
||||
@ -874,7 +874,7 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
||||
num_host_routers = len(host_routers['routers'])
|
||||
self.assertEqual(exp_hosted, num_host_routers)
|
||||
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, L3_HOSTA)
|
||||
|
||||
|
@ -22,6 +22,7 @@ import netaddr
|
||||
from oslo.config import cfg
|
||||
from webob import exc
|
||||
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants as l3_constants
|
||||
from neutron.common import exceptions as n_exc
|
||||
@ -33,7 +34,6 @@ from neutron.db import l3_agentschedulers_db
|
||||
from neutron.db import l3_attrs_db
|
||||
from neutron.db import l3_db
|
||||
from neutron.db import l3_dvr_db
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.extensions import external_net
|
||||
from neutron.extensions import l3
|
||||
from neutron.extensions import portbindings
|
||||
@ -1992,15 +1992,15 @@ class L3NatDBIntAgentSchedulingTestCase(L3BaseForIntTests,
|
||||
self.subnet(),
|
||||
self.subnet()) as (r, s1, s2):
|
||||
self._set_net_external(s1['subnet']['network_id'])
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_one_l3_agent(
|
||||
host='host1',
|
||||
ext_net_id=s1['subnet']['network_id'])
|
||||
self._register_one_l3_agent(
|
||||
host='host2', internal_only=False,
|
||||
ext_net_id=s2['subnet']['network_id'])
|
||||
l3_rpc.sync_routers(self.adminContext,
|
||||
host='host1')
|
||||
l3_rpc_cb.sync_routers(self.adminContext,
|
||||
host='host1')
|
||||
self._assert_router_on_agent(r['router']['id'], 'host1')
|
||||
|
||||
self._add_external_gateway_to_router(
|
||||
@ -2023,15 +2023,15 @@ class L3NatDBIntAgentSchedulingTestCase(L3BaseForIntTests,
|
||||
self.subnet(),
|
||||
self.subnet()) as (r, s1, s2):
|
||||
self._set_net_external(s1['subnet']['network_id'])
|
||||
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_one_l3_agent(
|
||||
host='host1',
|
||||
ext_net_id=s1['subnet']['network_id'])
|
||||
self._register_one_l3_agent(
|
||||
host='host2', internal_only=False,
|
||||
ext_net_id='', ext_bridge='')
|
||||
l3_rpc.sync_routers(self.adminContext,
|
||||
host='host1')
|
||||
l3_rpc_cb.sync_routers(self.adminContext,
|
||||
host='host1')
|
||||
self._assert_router_on_agent(r['router']['id'], 'host1')
|
||||
|
||||
self._add_external_gateway_to_router(
|
||||
@ -2061,17 +2061,17 @@ class L3NatDBIntAgentSchedulingTestCase(L3BaseForIntTests,
|
||||
expected_code=exc.HTTPBadRequest.code)
|
||||
|
||||
|
||||
class L3RpcCallbackMixinTestCase(base.BaseTestCase):
|
||||
class L3RpcCallbackTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(L3RpcCallbackMixinTestCase, self).setUp()
|
||||
super(L3RpcCallbackTestCase, self).setUp()
|
||||
self.mock_plugin = mock.patch.object(
|
||||
l3_rpc_base.L3RpcCallbackMixin,
|
||||
l3_rpc.L3RpcCallback,
|
||||
'plugin', new_callable=mock.PropertyMock).start()
|
||||
self.mock_l3plugin = mock.patch.object(
|
||||
l3_rpc_base.L3RpcCallbackMixin,
|
||||
l3_rpc.L3RpcCallback,
|
||||
'l3plugin', new_callable=mock.PropertyMock).start()
|
||||
self.mixin = l3_rpc_base.L3RpcCallbackMixin()
|
||||
self.l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
|
||||
def test__ensure_host_set_on_port_update_on_concurrent_delete(self):
|
||||
port_id = 'foo_port_id'
|
||||
@ -2082,12 +2082,12 @@ class L3RpcCallbackMixinTestCase(base.BaseTestCase):
|
||||
portbindings.VIF_TYPE: portbindings.VIF_TYPE_BINDING_FAILED
|
||||
}
|
||||
router_id = 'foo_router_id'
|
||||
self.mixin.plugin.update_port.side_effect = n_exc.PortNotFound(
|
||||
self.l3_rpc_cb.plugin.update_port.side_effect = n_exc.PortNotFound(
|
||||
port_id=port_id)
|
||||
with mock.patch.object(l3_rpc_base.LOG, 'debug') as mock_log:
|
||||
self.mixin._ensure_host_set_on_port(
|
||||
with mock.patch.object(l3_rpc.LOG, 'debug') as mock_log:
|
||||
self.l3_rpc_cb._ensure_host_set_on_port(
|
||||
mock.ANY, mock.ANY, port, router_id)
|
||||
self.mixin.plugin.update_port.assert_called_once_with(
|
||||
self.l3_rpc_cb.plugin.update_port.assert_called_once_with(
|
||||
mock.ANY, port_id, {'port': {'binding:host_id': mock.ANY}})
|
||||
self.assertTrue(mock_log.call_count)
|
||||
expected_message = ('Port foo_port_id not found while updating '
|
||||
|
Loading…
x
Reference in New Issue
Block a user