diff --git a/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py index 22240951cb..9a75fd4140 100644 --- a/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py @@ -15,6 +15,8 @@ import random +from oslo import messaging + from neutron.common import constants from neutron.common import rpc as n_rpc from neutron.common import topics @@ -28,23 +30,20 @@ from neutron.plugins.common import constants as service_constants LOG = logging.getLogger(__name__) -class L3AgentNotifyAPI(n_rpc.RpcProxy): +class L3AgentNotifyAPI(object): """API for plugin to notify L3 agent.""" - BASE_RPC_API_VERSION = '1.0' def __init__(self, topic=topics.L3_AGENT): - super(L3AgentNotifyAPI, self).__init__( - topic=topic, default_version=self.BASE_RPC_API_VERSION) + target = messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) def _notification_host(self, context, method, payload, host): """Notify the agent that is hosting the router.""" LOG.debug('Nofity agent at %(host)s the message ' '%(method)s', {'host': host, 'method': method}) - self.cast( - context, self.make_msg(method, - payload=payload), - topic='%s.%s' % (topics.L3_AGENT, host)) + cctxt = self.client.prepare(server=host) + cctxt.cast(context, method, payload=payload) def _agent_notification(self, context, method, router_ids, operation, shuffle_agents): @@ -65,11 +64,10 @@ class L3AgentNotifyAPI(n_rpc.RpcProxy): {'topic': l3_agent.topic, 'host': l3_agent.host, 'method': method}) - self.cast( - context, self.make_msg(method, - routers=[router_id]), - topic='%s.%s' % (l3_agent.topic, l3_agent.host), - version='1.1') + cctxt = self.client.prepare(topic=l3_agent.topic, + server=l3_agent.host, + version='1.1') + cctxt.cast(context, method, routers=[router_id]) def _agent_notification_arp(self, context, method, router_id, operation, data): @@ -88,14 +86,15 @@ class L3AgentNotifyAPI(n_rpc.RpcProxy): # TODO(murali): replace cast with fanout to avoid performance # issues at greater scale. for l3_agent in l3_agents: - topic = '%s.%s' % (l3_agent.topic, l3_agent.host) + log_topic = '%s.%s' % (l3_agent.topic, l3_agent.host) LOG.debug('Casting message %(method)s with topic %(topic)s', - {'topic': topic, 'method': method}) + {'topic': log_topic, 'method': method}) dvr_arptable = {'router_id': router_id, 'arp_table': data} - self.cast(context, - self.make_msg(method, payload=dvr_arptable), - topic=topic, version='1.2') + cctxt = self.client.prepare(topic=l3_agent.topic, + server=l3_agent.host, + version='1.2') + cctxt.cast(context, method, payload=dvr_arptable) def _notification(self, context, method, router_ids, operation, shuffle_agents): @@ -114,10 +113,8 @@ class L3AgentNotifyAPI(n_rpc.RpcProxy): self._agent_notification( context, method, router_ids, operation, shuffle_agents) else: - self.fanout_cast( - context, self.make_msg(method, - routers=router_ids), - topic=topics.L3_AGENT) + cctxt = self.client.prepare(fanout=True) + cctxt.cast(context, method, routers=router_ids) def _notification_fanout(self, context, method, router_id): """Fanout the deleted router to all L3 agents.""" @@ -126,10 +123,8 @@ class L3AgentNotifyAPI(n_rpc.RpcProxy): {'topic': topics.L3_AGENT, 'method': method, 'router_id': router_id}) - self.fanout_cast( - context, self.make_msg(method, - router_id=router_id), - topic=topics.L3_AGENT) + cctxt = self.client.prepare(fanout=True) + cctxt.cast(context, method, router_id=router_id) def agent_updated(self, context, admin_state_up, host): self._notification_host(context, 'agent_updated', diff --git a/neutron/tests/unit/openvswitch/test_agent_scheduler.py b/neutron/tests/unit/openvswitch/test_agent_scheduler.py index 29dcab553f..7813d41e31 100644 --- a/neutron/tests/unit/openvswitch/test_agent_scheduler.py +++ b/neutron/tests/unit/openvswitch/test_agent_scheduler.py @@ -1361,20 +1361,23 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin, l3_plugin = (manager.NeutronManager.get_service_plugins() [service_constants.L3_ROUTER_NAT]) l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3] - with mock.patch.object(l3_notifier, 'cast') as mock_l3: - with self.router() as router1: - self._register_agent_states() - hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, - L3_HOSTA) - self._add_router_to_l3_agent(hosta_id, - router1['router']['id']) - routers = [router1['router']['id']] - mock_l3.assert_called_with( - mock.ANY, - l3_notifier.make_msg( - 'router_added_to_agent', - payload=routers), - topic='l3_agent.hosta') + with contextlib.nested( + mock.patch.object(l3_notifier.client, 'prepare', + return_value=l3_notifier.client), + mock.patch.object(l3_notifier.client, 'cast'), + self.router(), + ) as ( + mock_prepare, mock_cast, router1 + ): + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._add_router_to_l3_agent(hosta_id, + router1['router']['id']) + routers = [router1['router']['id']] + mock_prepare.assert_called_with(server='hosta') + mock_cast.assert_called_with( + mock.ANY, 'router_added_to_agent', payload=routers) notifications = fake_notifier.NOTIFICATIONS expected_event_type = 'l3_agent.router.add' self._assert_notify(notifications, expected_event_type) @@ -1383,20 +1386,25 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin, l3_plugin = (manager.NeutronManager.get_service_plugins() [service_constants.L3_ROUTER_NAT]) l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3] - with mock.patch.object(l3_notifier, 'cast') as mock_l3: - with self.router() as router1: - self._register_agent_states() - hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, - L3_HOSTA) - self._add_router_to_l3_agent(hosta_id, - router1['router']['id']) - self._remove_router_from_l3_agent(hosta_id, - router1['router']['id']) - mock_l3.assert_called_with( - mock.ANY, l3_notifier.make_msg( - 'router_removed_from_agent', - payload={'router_id': router1['router']['id']}), - topic='l3_agent.hosta') + with contextlib.nested( + mock.patch.object(l3_notifier.client, 'prepare', + return_value=l3_notifier.client), + mock.patch.object(l3_notifier.client, 'cast'), + self.router(), + ) as ( + mock_prepare, mock_cast, router1 + ): + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._add_router_to_l3_agent(hosta_id, + router1['router']['id']) + self._remove_router_from_l3_agent(hosta_id, + router1['router']['id']) + mock_prepare.assert_called_with(server='hosta') + mock_cast.assert_called_with( + mock.ANY, 'router_removed_from_agent', + payload={'router_id': router1['router']['id']}) notifications = fake_notifier.NOTIFICATIONS expected_event_type = 'l3_agent.router.remove' self._assert_notify(notifications, expected_event_type) @@ -1405,12 +1413,19 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin, l3_plugin = (manager.NeutronManager.get_service_plugins() [service_constants.L3_ROUTER_NAT]) l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3] - with mock.patch.object(l3_notifier, 'cast') as mock_l3: + with contextlib.nested( + mock.patch.object(l3_notifier.client, 'prepare', + return_value=l3_notifier.client), + mock.patch.object(l3_notifier.client, 'cast'), + ) as ( + mock_prepare, mock_cast + ): self._register_agent_states() hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, L3_HOSTA) self._disable_agent(hosta_id, admin_state_up=False) - mock_l3.assert_called_with( - mock.ANY, l3_notifier.make_msg( - 'agent_updated', payload={'admin_state_up': False}), - topic='l3_agent.hosta') + + mock_prepare.assert_called_with(server='hosta') + + mock_cast.assert_called_with( + mock.ANY, 'agent_updated', payload={'admin_state_up': False})