Fix Metering doesn't respect the l3 agent binding

This patch fix the issue by changing the call to
find the plugin which handles the l3 which is now
the l3_router service plugin instead of the old mixin.

Also change the unit tests to use the l3 service plugin
instead of the l3 mixin and refactor the rpc callbacks
part.

Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com>
Closes-bug: #1257354
Change-Id: Ide26f825005fa63cd3fcc75fa91fffb947e0be7a
This commit is contained in:
Sylvain Afchain 2014-01-07 10:36:58 +01:00
parent 346d674a10
commit aa07ab62af
7 changed files with 235 additions and 65 deletions

View File

@ -20,6 +20,7 @@ from neutron.common import utils
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import proxy
from neutron.plugins.common import constants as service_constants
LOG = logging.getLogger(__name__)
@ -35,7 +36,8 @@ class MeteringAgentNotifyAPI(proxy.RpcProxy):
def _agent_notification(self, context, method, routers):
"""Notify l3 metering agents hosted by l3 agent hosts."""
adminContext = context.is_admin and context or context.elevated()
plugin = manager.NeutronManager.get_plugin()
plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
l3_routers = {}
for router in routers:
@ -71,7 +73,8 @@ class MeteringAgentNotifyAPI(proxy.RpcProxy):
def _notification(self, context, method, routers):
"""Notify all the agents that are hosting the routers."""
plugin = manager.NeutronManager.get_plugin()
plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if utils.is_extension_supported(
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
self._agent_notification(context, method, routers)

View File

@ -227,12 +227,13 @@ class MeteringDbMixin(metering.MeteringPluginBase,
return routers_dict.values()
def get_sync_data_metering(self, context, label_id=None):
with context.session.begin(subtransactions=True):
def get_sync_data_metering(self, context, label_id=None, router_ids=None):
labels = context.session.query(MeteringLabel)
if label_id:
label = self._get_by_id(context, MeteringLabel, label_id)
labels = [label]
else:
labels = self._get_collection_query(context, MeteringLabel)
labels = labels.filter(MeteringLabel.id == label_id)
elif router_ids:
labels = (labels.join(MeteringLabel.routers).
filter(l3_db.Router.id.in_(router_ids)))
return self._process_sync_metering_data(labels)

View File

@ -0,0 +1,59 @@
# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import constants as consts
from neutron.common import rpc as p_rpc
from neutron.common import utils
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants as service_constants
LOG = logging.getLogger(__name__)
class MeteringRpcCallbacks(object):
RPC_API_VERSION = '1.0'
def __init__(self, meter_plugin):
self.meter_plugin = meter_plugin
def create_rpc_dispatcher(self):
return p_rpc.PluginRpcDispatcher([self])
def get_sync_data_metering(self, context, **kwargs):
l3_plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if not l3_plugin:
return
host = kwargs.get('host')
if not utils.is_extension_supported(
l3_plugin, consts.L3_AGENT_SCHEDULER_EXT_ALIAS) or not host:
return self.meter_plugin.get_sync_data_metering(context)
else:
agents = l3_plugin.get_l3_agents(context, filters={'host': [host]})
if not agents:
LOG.error(_('Unable to find agent %s.'), host)
return
routers = l3_plugin.list_routers_on_l3_agent(context, agents[0].id)
router_ids = [router['id'] for router in routers['routers']]
if not router_ids:
return
return self.meter_plugin.get_sync_data_metering(context,
router_ids=router_ids)

View File

@ -88,7 +88,7 @@ class MeteringAgent(MeteringPluginRpc, manager.Manager):
self.label_tenant_id = {}
self.routers = {}
self.metering_infos = {}
super(MeteringAgent, self).__init__(host=self.conf.host)
super(MeteringAgent, self).__init__(host=host)
def _load_drivers(self):
"""Loads plugin-driver from configuration."""

View File

@ -15,26 +15,12 @@
# under the License.
from neutron.api.rpc.agentnotifiers import metering_rpc_agent_api
from neutron.common import rpc as p_rpc
from neutron.common import topics
from neutron.db.metering import metering_db
from neutron.db.metering import metering_rpc
from neutron.openstack.common import rpc
class MeteringCallbacks(metering_db.MeteringDbMixin):
RPC_API_VERSION = '1.0'
def __init__(self, plugin):
self.plugin = plugin
def create_rpc_dispatcher(self):
return p_rpc.PluginRpcDispatcher([self])
def get_sync_data_metering(self, context, **kwargs):
return super(MeteringCallbacks, self).get_sync_data_metering(context)
class MeteringPlugin(metering_db.MeteringDbMixin):
"""Implementation of the Neutron Metering Service Plugin."""
supported_extension_aliases = ["metering"]
@ -42,7 +28,7 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
def __init__(self):
super(MeteringPlugin, self).__init__()
self.callbacks = MeteringCallbacks(self)
self.callbacks = metering_rpc.MeteringRpcCallbacks(self)
self.conn = rpc.create_connection(new=True)
self.conn.create_consumer(

View File

@ -17,11 +17,16 @@
import mock
from neutron.api.v2 import attributes as attr
from neutron.common import constants as n_constants
from neutron.common import topics
from neutron import context
from neutron.db import agents_db
from neutron.db import l3_agentschedulers_db
from neutron.db.metering import metering_rpc
from neutron.extensions import l3 as ext_l3
from neutron.extensions import metering as ext_metering
from neutron import manager
from neutron.openstack.common import timeutils
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
from neutron.tests.unit.db.metering import test_db_metering
@ -31,7 +36,7 @@ from neutron.tests.unit import test_l3_plugin
_uuid = uuidutils.generate_uuid
DB_METERING_PLUGIN_KLASS = (
METERING_SERVICE_PLUGIN_KLASS = (
"neutron.services.metering."
"metering_plugin.MeteringPlugin"
)
@ -65,8 +70,9 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
)
def setUp(self):
service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS}
plugin = 'neutron.tests.unit.test_l3_plugin.TestL3NatIntPlugin'
service_plugins = {'metering_plugin_name':
METERING_SERVICE_PLUGIN_KLASS}
ext_mgr = MeteringTestExtensionManager()
super(TestMeteringPlugin, self).setUp(plugin=plugin, ext_mgr=ext_mgr,
service_plugins=service_plugins)
@ -251,12 +257,8 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
self.assertEqual(tenant_id, router['router']['tenant_id'])
class TestRouteIntPlugin(l3_agentschedulers_db.L3AgentSchedulerDbMixin,
test_l3_plugin.TestL3NatIntPlugin):
supported_extension_aliases = ["router", "l3_agent_scheduler"]
class TestMeteringPluginL3AgentScheduler(
l3_agentschedulers_db.L3AgentSchedulerDbMixin,
test_db_plugin.NeutronDbPluginV2TestCase,
test_l3_plugin.L3NatTestCaseMixin,
test_db_metering.MeteringPluginDbTestCaseMixin):
@ -266,10 +268,18 @@ class TestMeteringPluginL3AgentScheduler(
for k in ext_metering.RESOURCE_ATTRIBUTE_MAP.keys()
)
def setUp(self):
service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS}
plugin_str = ('neutron.tests.unit.services.metering.'
'test_metering_plugin.TestRouteIntPlugin')
def setUp(self, plugin_str=None, service_plugins=None, scheduler=None):
if not plugin_str:
plugin_str = ('neutron.tests.unit.test_l3_plugin.'
'TestL3NatIntAgentSchedulingPlugin')
if not service_plugins:
service_plugins = {'metering_plugin_name':
METERING_SERVICE_PLUGIN_KLASS}
if not scheduler:
scheduler = plugin_str
ext_mgr = MeteringTestExtensionManager()
super(TestMeteringPluginL3AgentScheduler,
self).setUp(plugin=plugin_str, ext_mgr=ext_mgr,
@ -291,7 +301,7 @@ class TestMeteringPluginL3AgentScheduler(
return_value=self.ctx)
self.mock_context = self.context_patch.start()
self.l3routers_patch = mock.patch(plugin_str +
self.l3routers_patch = mock.patch(scheduler +
'.get_l3_agents_hosting_routers')
self.l3routers_mock = self.l3routers_patch.start()
@ -299,7 +309,7 @@ class TestMeteringPluginL3AgentScheduler(
def test_add_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected = {'args': {'routers': [{'status': 'ACTIVE',
expected1 = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
@ -307,8 +317,10 @@ class TestMeteringPluginL3AgentScheduler(
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': self.uuid},
{'status': 'ACTIVE',
'id': self.uuid}]},
'namespace': None,
'method': 'add_metering_label'}
expected2 = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router2',
'gw_port_id': None,
'admin_state_up': True,
@ -320,9 +332,17 @@ class TestMeteringPluginL3AgentScheduler(
'namespace': None,
'method': 'add_metering_label'}
agent_host = 'l3_agent_host'
agent = agents_db.Agent(host=agent_host)
self.l3routers_mock.return_value = [agent]
# bind each router to a specific agent
agent1 = agents_db.Agent(host='agent1')
agent2 = agents_db.Agent(host='agent2')
agents = {self.uuid: agent1,
second_uuid: agent2}
def side_effect(context, routers, admin_state_up, active):
return [agents[routers[0]]]
self.l3routers_mock.side_effect = side_effect
with self.router(name='router1', tenant_id=self.tenant_id,
set_context=True):
@ -331,7 +351,99 @@ class TestMeteringPluginL3AgentScheduler(
set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
topic = "%s.%s" % (self.topic, agent_host)
self.mock_cast.assert_called_with(self.ctx,
expected,
topic=topic)
topic1 = "%s.%s" % (self.topic, 'agent1')
topic2 = "%s.%s" % (self.topic, 'agent2')
# check if there is a call per agent
expected = [mock.call(self.ctx, expected1, topic=topic1),
mock.call(self.ctx, expected2, topic=topic2)]
self.mock_cast.assert_has_calls(expected, any_order=True)
class TestMeteringPluginL3AgentSchedulerServicePlugin(
TestMeteringPluginL3AgentScheduler):
"""Unit tests for the case where separate service plugin
implements L3 routing.
"""
def setUp(self):
l3_plugin = ('neutron.tests.unit.test_l3_plugin.'
'TestL3NatAgentSchedulingServicePlugin')
service_plugins = {'metering_plugin_name':
METERING_SERVICE_PLUGIN_KLASS,
'l3_plugin_name': l3_plugin}
plugin_str = ('neutron.tests.unit.test_l3_plugin.'
'TestNoL3NatPlugin')
super(TestMeteringPluginL3AgentSchedulerServicePlugin, self).setUp(
plugin_str=plugin_str, service_plugins=service_plugins,
scheduler=l3_plugin)
class TestMeteringPluginRpcFromL3Agent(
test_db_plugin.NeutronDbPluginV2TestCase,
test_l3_plugin.L3NatTestCaseMixin,
test_db_metering.MeteringPluginDbTestCaseMixin):
resource_prefix_map = dict(
(k.replace('_', '-'), constants.COMMON_PREFIXES[constants.METERING])
for k in ext_metering.RESOURCE_ATTRIBUTE_MAP
)
def setUp(self):
service_plugins = {'metering_plugin_name':
METERING_SERVICE_PLUGIN_KLASS}
plugin = ('neutron.tests.unit.test_l3_plugin.'
'TestL3NatIntAgentSchedulingPlugin')
ext_mgr = MeteringTestExtensionManager()
super(TestMeteringPluginRpcFromL3Agent,
self).setUp(plugin=plugin, service_plugins=service_plugins,
ext_mgr=ext_mgr)
self.meter_plugin = manager.NeutronManager.get_service_plugins().get(
constants.METERING)
self.adminContext = context.get_admin_context()
self._register_l3_agent('agent1')
def _register_l3_agent(self, host):
agent = {
'binary': 'neutron-l3-agent',
'host': host,
'topic': topics.L3_AGENT,
'configurations': {},
'agent_type': n_constants.AGENT_TYPE_L3,
'start_flag': True
}
callback = agents_db.AgentExtRpcCallback()
callback.report_state(self.adminContext,
agent_state={'agent_state': agent},
time=timeutils.strtime())
def test_get_sync_data_metering(self):
with self.subnet() as subnet:
s = subnet['subnet']
self._set_net_external(s['network_id'])
with self.router(name='router1', subnet=subnet) as router:
r = router['router']
self._add_external_gateway_to_router(r['id'], s['network_id'])
with self.metering_label(tenant_id=r['tenant_id']):
callbacks = metering_rpc.MeteringRpcCallbacks(
self.meter_plugin)
data = callbacks.get_sync_data_metering(self.adminContext,
host='agent1')
self.assertEqual('router1', data[0]['name'])
self._register_l3_agent('agent2')
data = callbacks.get_sync_data_metering(self.adminContext,
host='agent2')
self.assertFalse(data)
self._remove_external_gateway_from_router(
r['id'], s['network_id'])

View File

@ -293,6 +293,15 @@ class TestL3NatServicePlugin(db_base_plugin_v2.CommonDbMixin,
return "L3 Routing Service Plugin for testing"
# A L3 routing with L3 agent scheduling service plugin class for tests with
# plugins that delegate away L3 routing functionality
class TestL3NatAgentSchedulingServicePlugin(TestL3NatServicePlugin,
l3_agentschedulers_db.
L3AgentSchedulerDbMixin):
supported_extension_aliases = ["router", "l3_agent_scheduler"]
class L3NatTestCaseMixin(object):
def _create_router(self, fmt, tenant_id, name=None,