Merge "Drop RpcProxy usage from MeteringAgentNotifyAPI"

This commit is contained in:
Jenkins 2014-11-28 22:22:12 +00:00 committed by Gerrit Code Review
commit 2e73eac6a8
2 changed files with 150 additions and 160 deletions

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo import messaging
from neutron.common import constants
from neutron.common import rpc as n_rpc
from neutron.common import topics
@ -23,13 +25,13 @@ from neutron.plugins.common import constants as service_constants
LOG = logging.getLogger(__name__)
class MeteringAgentNotifyAPI(n_rpc.RpcProxy):
class MeteringAgentNotifyAPI(object):
"""API for plugin to notify L3 metering agent."""
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.METERING_AGENT):
super(MeteringAgentNotifyAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic = topic
target = messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def _agent_notification(self, context, method, routers):
"""Notify l3 metering agents hosted by l3 agent hosts."""
@ -55,8 +57,8 @@ class MeteringAgentNotifyAPI(n_rpc.RpcProxy):
l3_routers[l3_agent.host] = l3_router
for host, routers in l3_routers.iteritems():
self.cast(context, self.make_msg(method, routers=routers),
topic='%s.%s' % (self.topic, host))
cctxt = self.client.prepare(server=host)
cctxt.cast(context, method, routers=routers)
def _notification_fanout(self, context, method, router_id):
LOG.debug('Fanout notify metering agent at %(topic)s the message '
@ -64,9 +66,8 @@ class MeteringAgentNotifyAPI(n_rpc.RpcProxy):
{'topic': self.topic,
'method': method,
'router_id': router_id})
self.fanout_cast(
context, self.make_msg(method,
router_id=router_id))
cctxt = self.client.prepare(fanout=True)
cctxt.cast(context, method, router_id=router_id)
def _notification(self, context, method, routers):
"""Notify all the agents that are hosting the routers."""
@ -76,7 +77,8 @@ class MeteringAgentNotifyAPI(n_rpc.RpcProxy):
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
self._agent_notification(context, method, routers)
else:
self.fanout_cast(context, self.make_msg(method, routers=routers))
cctxt = self.client.prepare(fanout=True)
cctxt.cast(context, method, routers=routers)
def router_deleted(self, context, router_id):
self._notification_fanout(context, 'router_deleted', router_id)

View File

@ -81,10 +81,6 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
self.uuid_patch = mock.patch(uuid, return_value=self.uuid)
self.mock_uuid = self.uuid_patch.start()
fanout = ('neutron.common.rpc.RpcProxy.fanout_cast')
self.fanout_patch = mock.patch(fanout)
self.mock_fanout = self.fanout_patch.start()
self.tenant_id = 'a7e61382-47b8-4d40-bae3-f95981b5637b'
self.ctx = context.Context('', self.tenant_id, is_admin=True)
self.context_patch = mock.patch('neutron.context.Context',
@ -93,19 +89,35 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
self.topic = 'metering_agent'
add = ('neutron.api.rpc.agentnotifiers.' +
'metering_rpc_agent_api.MeteringAgentNotifyAPI' +
'.add_metering_label')
self.add_patch = mock.patch(add)
self.mock_add = self.add_patch.start()
remove = ('neutron.api.rpc.agentnotifiers.' +
'metering_rpc_agent_api.MeteringAgentNotifyAPI' +
'.remove_metering_label')
self.remove_patch = mock.patch(remove)
self.mock_remove = self.remove_patch.start()
update = ('neutron.api.rpc.agentnotifiers.' +
'metering_rpc_agent_api.MeteringAgentNotifyAPI' +
'.update_metering_label_rules')
self.update_patch = mock.patch(update)
self.mock_update = self.update_patch.start()
def test_add_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'add_metering_label'}
expected = [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid}],
'id': self.uuid}]
tenant_id_2 = '8a268a58-1610-4890-87e0-07abb8231206'
self.mock_uuid.return_value = second_uuid
@ -116,23 +128,21 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
self.mock_fanout.assert_called_with(self.ctx, expected)
self.mock_add.assert_called_with(self.ctx, expected)
def test_add_metering_label_shared_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid},
{'rules': [],
'id': second_uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'add_metering_label'}
expected = [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid},
{'rules': [],
'id': second_uuid}],
'id': self.uuid}]
tenant_id_2 = '8a268a58-1610-4890-87e0-07abb8231206'
with self.router(name='router1', tenant_id=self.tenant_id,
@ -142,57 +152,47 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
self.mock_uuid.return_value = second_uuid
with self.metering_label(tenant_id=tenant_id_2, shared=True,
set_context=True):
self.mock_fanout.assert_called_with(self.ctx, expected)
self.mock_add.assert_called_with(self.ctx, expected)
def test_remove_metering_label_rpc_call(self):
expected = {'args':
{'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'add_metering_label'}
expected = [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid}],
'id': self.uuid}]
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
self.mock_fanout.assert_called_with(self.ctx, expected)
expected['method'] = 'remove_metering_label'
self.mock_fanout.assert_called_with(self.ctx, expected)
self.mock_add.assert_called_with(self.ctx, expected)
self.mock_remove.assert_called_with(self.ctx, expected)
def test_remove_one_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected_add = {'args':
{'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid},
{'rules': [],
'id': second_uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'add_metering_label'}
expected_remove = {'args':
{'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'remove_metering_label'}
expected_add = [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': self.uuid},
{'rules': [],
'id': second_uuid}],
'id': self.uuid}]
expected_remove = [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': self.uuid}]
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
@ -200,53 +200,45 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
self.mock_uuid.return_value = second_uuid
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
self.mock_fanout.assert_called_with(self.ctx, expected_add)
self.mock_fanout.assert_called_with(self.ctx, expected_remove)
self.mock_add.assert_called_with(self.ctx, expected_add)
self.mock_remove.assert_called_with(self.ctx, expected_remove)
def test_update_metering_label_rules_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected_add = {'args':
{'routers': [
{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [
{'remote_ip_prefix': '10.0.0.0/24',
'direction': 'ingress',
'metering_label_id': self.uuid,
'excluded': False,
'id': self.uuid},
{'remote_ip_prefix': '10.0.0.0/24',
'direction': 'egress',
'metering_label_id': self.uuid,
'excluded': False,
'id': second_uuid}],
'id': self.uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'update_metering_label_rules'}
expected_add = [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [
{'remote_ip_prefix': '10.0.0.0/24',
'direction': 'ingress',
'metering_label_id': self.uuid,
'excluded': False,
'id': self.uuid},
{'remote_ip_prefix': '10.0.0.0/24',
'direction': 'egress',
'metering_label_id': self.uuid,
'excluded': False,
'id': second_uuid}],
'id': self.uuid}],
'id': self.uuid}]
expected_del = {'args':
{'routers': [
{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [
{'remote_ip_prefix': '10.0.0.0/24',
'direction': 'ingress',
'metering_label_id': self.uuid,
'excluded': False,
'id': self.uuid}],
expected_del = [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [
{'remote_ip_prefix': '10.0.0.0/24',
'direction': 'ingress',
'metering_label_id': self.uuid,
'excluded': False,
'id': self.uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'update_metering_label_rules'}
'id': self.uuid}],
'id': self.uuid}]
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
@ -255,9 +247,9 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
with self.metering_label_rule(l['id']):
self.mock_uuid.return_value = second_uuid
with self.metering_label_rule(l['id'], direction='egress'):
self.mock_fanout.assert_called_with(self.ctx,
self.mock_update.assert_called_with(self.ctx,
expected_add)
self.mock_fanout.assert_called_with(self.ctx,
self.mock_update.assert_called_with(self.ctx,
expected_del)
def test_delete_metering_label_does_not_clear_router_tenant_id(self):
@ -307,10 +299,6 @@ class TestMeteringPluginL3AgentScheduler(
self.uuid_patch = mock.patch(uuid, return_value=self.uuid)
self.mock_uuid = self.uuid_patch.start()
cast = 'neutron.common.rpc.RpcProxy.cast'
self.cast_patch = mock.patch(cast)
self.mock_cast = self.cast_patch.start()
self.tenant_id = 'a7e61382-47b8-4d40-bae3-f95981b5637b'
self.ctx = context.Context('', self.tenant_id, is_admin=True)
self.context_patch = mock.patch('neutron.context.Context',
@ -323,30 +311,38 @@ class TestMeteringPluginL3AgentScheduler(
self.topic = 'metering_agent'
add = ('neutron.api.rpc.agentnotifiers.' +
'metering_rpc_agent_api.MeteringAgentNotifyAPI' +
'.add_metering_label')
self.add_patch = mock.patch(add)
self.mock_add = self.add_patch.start()
remove = ('neutron.api.rpc.agentnotifiers.' +
'metering_rpc_agent_api.MeteringAgentNotifyAPI' +
'.remove_metering_label')
self.remove_patch = mock.patch(remove)
self.mock_remove = self.remove_patch.start()
def test_add_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected1 = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'add_metering_label'}
expected2 = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router2',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': second_uuid}]},
'namespace': None,
'method': 'add_metering_label'}
expected = [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': self.uuid},
{'status': 'ACTIVE',
'name': 'router2',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': second_uuid}]
# bind each router to a specific agent
agent1 = agents_db.Agent(host='agent1')
@ -367,15 +363,7 @@ class TestMeteringPluginL3AgentScheduler(
set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
topic1 = "%s.%s" % (self.topic, 'agent1')
topic2 = "%s.%s" % (self.topic, 'agent2')
# check if there is a call per agent
expected = [mock.call(self.ctx, expected1, topic=topic1),
mock.call(self.ctx, expected2, topic=topic2)]
self.mock_cast.assert_has_calls(expected, any_order=True)
self.mock_add.assert_called_with(self.ctx, expected)
class TestMeteringPluginL3AgentSchedulerServicePlugin(