diff --git a/neutron/api/rpc/agentnotifiers/metering_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/metering_rpc_agent_api.py index 3543ebe538..dfecbaff8e 100644 --- a/neutron/api/rpc/agentnotifiers/metering_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/metering_rpc_agent_api.py @@ -20,6 +20,7 @@ from neutron.common import utils from neutron import manager from neutron.openstack.common import log as logging from neutron.openstack.common.rpc import proxy +from neutron.plugins.common import constants as service_constants LOG = logging.getLogger(__name__) @@ -35,7 +36,8 @@ class MeteringAgentNotifyAPI(proxy.RpcProxy): def _agent_notification(self, context, method, routers): """Notify l3 metering agents hosted by l3 agent hosts.""" adminContext = context.is_admin and context or context.elevated() - plugin = manager.NeutronManager.get_plugin() + plugin = manager.NeutronManager.get_service_plugins().get( + service_constants.L3_ROUTER_NAT) l3_routers = {} for router in routers: @@ -71,7 +73,8 @@ class MeteringAgentNotifyAPI(proxy.RpcProxy): def _notification(self, context, method, routers): """Notify all the agents that are hosting the routers.""" - plugin = manager.NeutronManager.get_plugin() + plugin = manager.NeutronManager.get_service_plugins().get( + service_constants.L3_ROUTER_NAT) if utils.is_extension_supported( plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS): self._agent_notification(context, method, routers) diff --git a/neutron/db/metering/metering_db.py b/neutron/db/metering/metering_db.py index e97d90ea58..fe48ae4fd0 100644 --- a/neutron/db/metering/metering_db.py +++ b/neutron/db/metering/metering_db.py @@ -227,12 +227,13 @@ class MeteringDbMixin(metering.MeteringPluginBase, return routers_dict.values() - def get_sync_data_metering(self, context, label_id=None): - with context.session.begin(subtransactions=True): - if label_id: - label = self._get_by_id(context, MeteringLabel, label_id) - labels = [label] - else: - labels = self._get_collection_query(context, MeteringLabel) + def get_sync_data_metering(self, context, label_id=None, router_ids=None): + labels = context.session.query(MeteringLabel) + + if label_id: + labels = labels.filter(MeteringLabel.id == label_id) + elif router_ids: + labels = (labels.join(MeteringLabel.routers). + filter(l3_db.Router.id.in_(router_ids))) return self._process_sync_metering_data(labels) diff --git a/neutron/db/metering/metering_rpc.py b/neutron/db/metering/metering_rpc.py new file mode 100644 index 0000000000..82e7d3dd1f --- /dev/null +++ b/neutron/db/metering/metering_rpc.py @@ -0,0 +1,59 @@ +# Copyright (C) 2014 eNovance SAS +# +# Author: Sylvain Afchain +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.common import constants as consts +from neutron.common import rpc as p_rpc +from neutron.common import utils +from neutron import manager +from neutron.openstack.common import log as logging +from neutron.plugins.common import constants as service_constants + +LOG = logging.getLogger(__name__) + + +class MeteringRpcCallbacks(object): + + RPC_API_VERSION = '1.0' + + def __init__(self, meter_plugin): + self.meter_plugin = meter_plugin + + def create_rpc_dispatcher(self): + return p_rpc.PluginRpcDispatcher([self]) + + def get_sync_data_metering(self, context, **kwargs): + l3_plugin = manager.NeutronManager.get_service_plugins().get( + service_constants.L3_ROUTER_NAT) + if not l3_plugin: + return + + host = kwargs.get('host') + if not utils.is_extension_supported( + l3_plugin, consts.L3_AGENT_SCHEDULER_EXT_ALIAS) or not host: + return self.meter_plugin.get_sync_data_metering(context) + else: + agents = l3_plugin.get_l3_agents(context, filters={'host': [host]}) + if not agents: + LOG.error(_('Unable to find agent %s.'), host) + return + + routers = l3_plugin.list_routers_on_l3_agent(context, agents[0].id) + router_ids = [router['id'] for router in routers['routers']] + if not router_ids: + return + + return self.meter_plugin.get_sync_data_metering(context, + router_ids=router_ids) diff --git a/neutron/services/metering/agents/metering_agent.py b/neutron/services/metering/agents/metering_agent.py index f07a2d10e9..d736961c76 100644 --- a/neutron/services/metering/agents/metering_agent.py +++ b/neutron/services/metering/agents/metering_agent.py @@ -88,7 +88,7 @@ class MeteringAgent(MeteringPluginRpc, manager.Manager): self.label_tenant_id = {} self.routers = {} self.metering_infos = {} - super(MeteringAgent, self).__init__(host=self.conf.host) + super(MeteringAgent, self).__init__(host=host) def _load_drivers(self): """Loads plugin-driver from configuration.""" diff --git a/neutron/services/metering/metering_plugin.py b/neutron/services/metering/metering_plugin.py index 15be0ebc05..af0c54be9f 100644 --- a/neutron/services/metering/metering_plugin.py +++ b/neutron/services/metering/metering_plugin.py @@ -15,26 +15,12 @@ # under the License. from neutron.api.rpc.agentnotifiers import metering_rpc_agent_api -from neutron.common import rpc as p_rpc from neutron.common import topics from neutron.db.metering import metering_db +from neutron.db.metering import metering_rpc from neutron.openstack.common import rpc -class MeteringCallbacks(metering_db.MeteringDbMixin): - - RPC_API_VERSION = '1.0' - - def __init__(self, plugin): - self.plugin = plugin - - def create_rpc_dispatcher(self): - return p_rpc.PluginRpcDispatcher([self]) - - def get_sync_data_metering(self, context, **kwargs): - return super(MeteringCallbacks, self).get_sync_data_metering(context) - - class MeteringPlugin(metering_db.MeteringDbMixin): """Implementation of the Neutron Metering Service Plugin.""" supported_extension_aliases = ["metering"] @@ -42,7 +28,7 @@ class MeteringPlugin(metering_db.MeteringDbMixin): def __init__(self): super(MeteringPlugin, self).__init__() - self.callbacks = MeteringCallbacks(self) + self.callbacks = metering_rpc.MeteringRpcCallbacks(self) self.conn = rpc.create_connection(new=True) self.conn.create_consumer( diff --git a/neutron/tests/unit/services/metering/test_metering_plugin.py b/neutron/tests/unit/services/metering/test_metering_plugin.py index 301b550945..f75d2768e9 100644 --- a/neutron/tests/unit/services/metering/test_metering_plugin.py +++ b/neutron/tests/unit/services/metering/test_metering_plugin.py @@ -17,11 +17,16 @@ import mock from neutron.api.v2 import attributes as attr +from neutron.common import constants as n_constants +from neutron.common import topics from neutron import context from neutron.db import agents_db from neutron.db import l3_agentschedulers_db +from neutron.db.metering import metering_rpc from neutron.extensions import l3 as ext_l3 from neutron.extensions import metering as ext_metering +from neutron import manager +from neutron.openstack.common import timeutils from neutron.openstack.common import uuidutils from neutron.plugins.common import constants from neutron.tests.unit.db.metering import test_db_metering @@ -31,7 +36,7 @@ from neutron.tests.unit import test_l3_plugin _uuid = uuidutils.generate_uuid -DB_METERING_PLUGIN_KLASS = ( +METERING_SERVICE_PLUGIN_KLASS = ( "neutron.services.metering." "metering_plugin.MeteringPlugin" ) @@ -65,8 +70,9 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase, ) def setUp(self): - service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS} plugin = 'neutron.tests.unit.test_l3_plugin.TestL3NatIntPlugin' + service_plugins = {'metering_plugin_name': + METERING_SERVICE_PLUGIN_KLASS} ext_mgr = MeteringTestExtensionManager() super(TestMeteringPlugin, self).setUp(plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins) @@ -251,12 +257,8 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase, self.assertEqual(tenant_id, router['router']['tenant_id']) -class TestRouteIntPlugin(l3_agentschedulers_db.L3AgentSchedulerDbMixin, - test_l3_plugin.TestL3NatIntPlugin): - supported_extension_aliases = ["router", "l3_agent_scheduler"] - - class TestMeteringPluginL3AgentScheduler( + l3_agentschedulers_db.L3AgentSchedulerDbMixin, test_db_plugin.NeutronDbPluginV2TestCase, test_l3_plugin.L3NatTestCaseMixin, test_db_metering.MeteringPluginDbTestCaseMixin): @@ -266,10 +268,18 @@ class TestMeteringPluginL3AgentScheduler( for k in ext_metering.RESOURCE_ATTRIBUTE_MAP.keys() ) - def setUp(self): - service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS} - plugin_str = ('neutron.tests.unit.services.metering.' - 'test_metering_plugin.TestRouteIntPlugin') + def setUp(self, plugin_str=None, service_plugins=None, scheduler=None): + if not plugin_str: + plugin_str = ('neutron.tests.unit.test_l3_plugin.' + 'TestL3NatIntAgentSchedulingPlugin') + + if not service_plugins: + service_plugins = {'metering_plugin_name': + METERING_SERVICE_PLUGIN_KLASS} + + if not scheduler: + scheduler = plugin_str + ext_mgr = MeteringTestExtensionManager() super(TestMeteringPluginL3AgentScheduler, self).setUp(plugin=plugin_str, ext_mgr=ext_mgr, @@ -291,7 +301,7 @@ class TestMeteringPluginL3AgentScheduler( return_value=self.ctx) self.mock_context = self.context_patch.start() - self.l3routers_patch = mock.patch(plugin_str + + self.l3routers_patch = mock.patch(scheduler + '.get_l3_agents_hosting_routers') self.l3routers_mock = self.l3routers_patch.start() @@ -299,30 +309,40 @@ class TestMeteringPluginL3AgentScheduler( def test_add_metering_label_rpc_call(self): second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84' - expected = {'args': {'routers': [{'status': 'ACTIVE', - 'name': 'router1', - 'gw_port_id': None, - 'admin_state_up': True, - 'tenant_id': self.tenant_id, - '_metering_labels': [ - {'rules': [], - 'id': second_uuid}], - 'id': self.uuid}, - {'status': 'ACTIVE', - 'name': 'router2', - 'gw_port_id': None, - 'admin_state_up': True, - 'tenant_id': self.tenant_id, - '_metering_labels': [ - {'rules': [], - 'id': second_uuid}], - 'id': second_uuid}]}, - 'namespace': None, - 'method': 'add_metering_label'} + expected1 = {'args': {'routers': [{'status': 'ACTIVE', + 'name': 'router1', + 'gw_port_id': None, + 'admin_state_up': True, + 'tenant_id': self.tenant_id, + '_metering_labels': [ + {'rules': [], + 'id': second_uuid}], + 'id': self.uuid}]}, + 'namespace': None, + 'method': 'add_metering_label'} + expected2 = {'args': {'routers': [{'status': 'ACTIVE', + 'name': 'router2', + 'gw_port_id': None, + 'admin_state_up': True, + 'tenant_id': self.tenant_id, + '_metering_labels': [ + {'rules': [], + 'id': second_uuid}], + 'id': second_uuid}]}, + 'namespace': None, + 'method': 'add_metering_label'} - agent_host = 'l3_agent_host' - agent = agents_db.Agent(host=agent_host) - self.l3routers_mock.return_value = [agent] + # bind each router to a specific agent + agent1 = agents_db.Agent(host='agent1') + agent2 = agents_db.Agent(host='agent2') + + agents = {self.uuid: agent1, + second_uuid: agent2} + + def side_effect(context, routers, admin_state_up, active): + return [agents[routers[0]]] + + self.l3routers_mock.side_effect = side_effect with self.router(name='router1', tenant_id=self.tenant_id, set_context=True): @@ -331,7 +351,99 @@ class TestMeteringPluginL3AgentScheduler( set_context=True): with self.metering_label(tenant_id=self.tenant_id, set_context=True): - topic = "%s.%s" % (self.topic, agent_host) - self.mock_cast.assert_called_with(self.ctx, - expected, - topic=topic) + + topic1 = "%s.%s" % (self.topic, 'agent1') + topic2 = "%s.%s" % (self.topic, 'agent2') + + # check if there is a call per agent + expected = [mock.call(self.ctx, expected1, topic=topic1), + mock.call(self.ctx, expected2, topic=topic2)] + + self.mock_cast.assert_has_calls(expected, any_order=True) + + +class TestMeteringPluginL3AgentSchedulerServicePlugin( + TestMeteringPluginL3AgentScheduler): + + """Unit tests for the case where separate service plugin + implements L3 routing. + """ + + def setUp(self): + l3_plugin = ('neutron.tests.unit.test_l3_plugin.' + 'TestL3NatAgentSchedulingServicePlugin') + service_plugins = {'metering_plugin_name': + METERING_SERVICE_PLUGIN_KLASS, + 'l3_plugin_name': l3_plugin} + + plugin_str = ('neutron.tests.unit.test_l3_plugin.' + 'TestNoL3NatPlugin') + + super(TestMeteringPluginL3AgentSchedulerServicePlugin, self).setUp( + plugin_str=plugin_str, service_plugins=service_plugins, + scheduler=l3_plugin) + + +class TestMeteringPluginRpcFromL3Agent( + test_db_plugin.NeutronDbPluginV2TestCase, + test_l3_plugin.L3NatTestCaseMixin, + test_db_metering.MeteringPluginDbTestCaseMixin): + + resource_prefix_map = dict( + (k.replace('_', '-'), constants.COMMON_PREFIXES[constants.METERING]) + for k in ext_metering.RESOURCE_ATTRIBUTE_MAP + ) + + def setUp(self): + service_plugins = {'metering_plugin_name': + METERING_SERVICE_PLUGIN_KLASS} + + plugin = ('neutron.tests.unit.test_l3_plugin.' + 'TestL3NatIntAgentSchedulingPlugin') + + ext_mgr = MeteringTestExtensionManager() + super(TestMeteringPluginRpcFromL3Agent, + self).setUp(plugin=plugin, service_plugins=service_plugins, + ext_mgr=ext_mgr) + + self.meter_plugin = manager.NeutronManager.get_service_plugins().get( + constants.METERING) + + self.adminContext = context.get_admin_context() + self._register_l3_agent('agent1') + + def _register_l3_agent(self, host): + agent = { + 'binary': 'neutron-l3-agent', + 'host': host, + 'topic': topics.L3_AGENT, + 'configurations': {}, + 'agent_type': n_constants.AGENT_TYPE_L3, + 'start_flag': True + } + callback = agents_db.AgentExtRpcCallback() + callback.report_state(self.adminContext, + agent_state={'agent_state': agent}, + time=timeutils.strtime()) + + def test_get_sync_data_metering(self): + with self.subnet() as subnet: + s = subnet['subnet'] + self._set_net_external(s['network_id']) + with self.router(name='router1', subnet=subnet) as router: + r = router['router'] + self._add_external_gateway_to_router(r['id'], s['network_id']) + with self.metering_label(tenant_id=r['tenant_id']): + callbacks = metering_rpc.MeteringRpcCallbacks( + self.meter_plugin) + data = callbacks.get_sync_data_metering(self.adminContext, + host='agent1') + self.assertEqual('router1', data[0]['name']) + + self._register_l3_agent('agent2') + data = callbacks.get_sync_data_metering(self.adminContext, + host='agent2') + self.assertFalse(data) + + self._remove_external_gateway_from_router( + r['id'], s['network_id']) diff --git a/neutron/tests/unit/test_l3_plugin.py b/neutron/tests/unit/test_l3_plugin.py index 4c81aa8072..02e0237912 100644 --- a/neutron/tests/unit/test_l3_plugin.py +++ b/neutron/tests/unit/test_l3_plugin.py @@ -293,6 +293,15 @@ class TestL3NatServicePlugin(db_base_plugin_v2.CommonDbMixin, return "L3 Routing Service Plugin for testing" +# A L3 routing with L3 agent scheduling service plugin class for tests with +# plugins that delegate away L3 routing functionality +class TestL3NatAgentSchedulingServicePlugin(TestL3NatServicePlugin, + l3_agentschedulers_db. + L3AgentSchedulerDbMixin): + + supported_extension_aliases = ["router", "l3_agent_scheduler"] + + class L3NatTestCaseMixin(object): def _create_router(self, fmt, tenant_id, name=None,