Add Neutron l3 metering agent

This is the agent side of :
blueprint bandwidth-router-measurement
blueprint bandwidth-router-label

This patch initiates agent side by adding a new
l3 metering agent and a Noop Driver.

DocImpact

Change-Id: Ib34721209c282b2669ff5488a33293d9f86467ef
This commit is contained in:
Sylvain Afchain 2013-07-04 16:00:30 +02:00
parent 4ec01c43d0
commit 4b5a2000b5
11 changed files with 576 additions and 1 deletions

12
etc/metering_agent.ini Normal file
View File

@ -0,0 +1,12 @@
[DEFAULT]
# Show debugging output in log (sets DEBUG log level output)
# debug = True
# driver = neutron.services.metering.drivers.iptables.iptables_driver.IptablesMeteringDriver
# Interval between two metering measures
# measure_interval = 30
# Interval between two metering reports
# report_interval = 300

View File

@ -71,6 +71,7 @@ AGENT_TYPE_NEC = 'NEC plugin agent'
AGENT_TYPE_L3 = 'L3 agent' AGENT_TYPE_L3 = 'L3 agent'
AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent' AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent'
AGENT_TYPE_MLNX = 'Mellanox plugin agent' AGENT_TYPE_MLNX = 'Mellanox plugin agent'
AGENT_TYPE_METERING = 'Metering agent'
L2_AGENT_TOPIC = 'N/A' L2_AGENT_TOPIC = 'N/A'
PAGINATION_INFINITE = 'infinite' PAGINATION_INFINITE = 'infinite'

View File

@ -31,7 +31,6 @@ METERING_PLUGIN = 'q-metering-plugin'
L3_AGENT = 'l3_agent' L3_AGENT = 'l3_agent'
DHCP_AGENT = 'dhcp_agent' DHCP_AGENT = 'dhcp_agent'
METERING_AGENT = 'metering_agent' METERING_AGENT = 'metering_agent'
METERING_PLUGIN = 'metering_plugin'
def get_topic_name(prefix, table, operation): def get_topic_name(prefix, table, operation):

View File

@ -0,0 +1,15 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,293 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import eventlet
from oslo.config import cfg
from neutron.agent.common import config
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as constants
from neutron.common import topics
from neutron.common import utils
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common import periodic_task
from neutron.openstack.common.rpc import proxy
from neutron.openstack.common import service
from neutron import service as neutron_service
LOG = logging.getLogger(__name__)
class MeteringPluginRpc(proxy.RpcProxy):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, host):
super(MeteringPluginRpc,
self).__init__(topic=topics.METERING_AGENT,
default_version=self.BASE_RPC_API_VERSION)
def _get_sync_data_metering(self, context):
try:
return self.call(context,
self.make_msg('get_sync_data_metering',
host=self.host),
topic=topics.METERING_PLUGIN)
except Exception:
LOG.exception(_("Failed synchronizing routers"))
class MeteringAgent(MeteringPluginRpc, manager.Manager):
Opts = [
cfg.StrOpt('driver',
default='neutron.services.metering.drivers.noop.'
'noop_driver.NoopMeteringDriver',
help=_("Metering driver")),
cfg.IntOpt('measure_interval', default=30,
help=_("Interval between two metering measures")),
cfg.IntOpt('report_interval', default=300,
help=_("Interval between two metering reports")),
]
def __init__(self, host, conf=None):
self.conf = conf or cfg.CONF
self._load_drivers()
self.root_helper = config.get_root_helper(self.conf)
self.context = context.get_admin_context_without_session()
self.metering_info = {}
self.metering_loop = loopingcall.FixedIntervalLoopingCall(
self._metering_loop
)
measure_interval = self.conf.measure_interval
self.last_report = 0
self.metering_loop.start(interval=measure_interval)
self.host = host
self.label_tenant_id = {}
self.routers = {}
self.metering_infos = {}
super(MeteringAgent, self).__init__(host=self.conf.host)
def _load_drivers(self):
"""Loads plugin-driver from configuration."""
LOG.info(_("Loading Metering driver %s") % self.conf.driver)
if not self.conf.driver:
raise SystemExit(_('A metering driver must be specified'))
self.metering_driver = importutils.import_object(
self.conf.driver, self, self.conf)
def _metering_notification(self):
for label_id, info in self.metering_infos.items():
data = {'label_id': label_id,
'tenant_id': self.label_tenant_id.get(label_id),
'pkts': info['pkts'],
'bytes': info['bytes'],
'time': info['time'],
'first_update': info['first_update'],
'last_update': info['last_update'],
'host': self.host}
LOG.debug("Send metering report: %s" % data)
notifier_api.notify(self.context,
notifier_api.publisher_id('metering'),
'l3.meter',
notifier_api.CONF.default_notification_level,
data)
info['pkts'] = 0
info['bytes'] = 0
info['time'] = 0
def _purge_metering_info(self):
ts = int(time.time())
report_interval = self.conf.report_interval
for label_id, info in self.metering_info.items():
if info['last_update'] > ts + report_interval:
del self.metering_info[label_id]
def _add_metering_info(self, label_id, pkts, bytes):
ts = int(time.time())
info = self.metering_infos.get(label_id, {'bytes': 0,
'pkts': 0,
'time': 0,
'first_update': ts,
'last_update': ts})
info['bytes'] += bytes
info['pkts'] += pkts
info['time'] += ts - info['last_update']
info['last_update'] = ts
self.metering_infos[label_id] = info
return info
def _add_metering_infos(self):
self.label_tenant_id = {}
for router in self.routers.values():
tenant_id = router['tenant_id']
labels = router.get(constants.METERING_LABEL_KEY, [])
for label in labels:
label_id = label['id']
self.label_tenant_id[label_id] = tenant_id
tenant_id = self.label_tenant_id.get
accs = self._get_traffic_counters(self.context, self.routers.values())
if not accs:
return
for label_id, acc in accs.items():
self._add_metering_info(label_id, acc['pkts'], acc['bytes'])
def _metering_loop(self):
self._add_metering_infos()
ts = int(time.time())
delta = ts - self.last_report
report_interval = self.conf.report_interval
if delta > report_interval:
self._metering_notification()
self._purge_metering_info()
self.last_report = ts
@utils.synchronized('metering-agent')
def _invoke_driver(self, context, meterings, func_name):
try:
return getattr(self.metering_driver, func_name)(context, meterings)
except RuntimeError:
LOG.exception(_("Driver %(driver)s does not implement %(func)s"),
{'driver': cfg.CONF.metering_driver,
'func': func_name})
@periodic_task.periodic_task(run_immediately=True)
def _sync_routers_task(self, context):
routers = self._get_sync_data_metering(self.context)
if not routers:
return
self._update_routers(context, routers)
def router_deleted(self, context, router_id):
self._add_metering_infos()
if router_id in self.routers:
del self.routers[router_id]
return self._invoke_driver(context, router_id,
'remove_router')
def routers_updated(self, context, routers=None):
if not routers:
routers = self._get_sync_data_metering(self.context)
if not routers:
return
self._update_routers(context, routers)
def _update_routers(self, context, routers):
for router in routers:
self.routers[router['id']] = router
return self._invoke_driver(context, routers,
'update_routers')
def _get_traffic_counters(self, context, routers):
LOG.debug(_("Get router traffic counters"))
return self._invoke_driver(context, routers, 'get_traffic_counters')
def update_metering_label_rules(self, context, routers):
LOG.debug(_("Update metering rules from agent"))
return self._invoke_driver(context, routers,
'update_metering_label_rules')
def add_metering_label(self, context, routers):
LOG.debug(_("Creating a metering label from agent"))
return self._invoke_driver(context, routers,
'add_metering_label')
def remove_metering_label(self, context, routers):
self._add_metering_infos()
LOG.debug(_("Delete a metering label from agent"))
return self._invoke_driver(context, routers,
'remove_metering_label')
class MeteringAgentWithStateReport(MeteringAgent):
def __init__(self, host, conf=None):
super(MeteringAgentWithStateReport, self).__init__(host=host,
conf=conf)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
self.agent_state = {
'binary': 'neutron-metering-agent',
'host': host,
'topic': topics.METERING_AGENT,
'configurations': {
'metering_driver': self.conf.driver,
'measure_interval':
self.conf.measure_interval,
'report_interval': self.conf.report_interval
},
'start_flag': True,
'agent_type': constants.AGENT_TYPE_METERING}
report_interval = cfg.CONF.AGENT.report_interval
self.use_call = True
if report_interval:
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
self.heartbeat.start(interval=report_interval)
def _report_state(self):
try:
self.state_rpc.report_state(self.context, self.agent_state,
self.use_call)
self.agent_state.pop('start_flag', None)
self.use_call = False
except AttributeError:
# This means the server does not support report_state
LOG.warn(_("Neutron server does not support state report."
" State report for this agent will be disabled."))
self.heartbeat.stop()
return
except Exception:
LOG.exception(_("Failed reporting state!"))
def agent_updated(self, context, payload):
LOG.info(_("agent_updated by server side %s!"), payload)
def main():
eventlet.monkey_patch()
conf = cfg.CONF
conf.register_opts(MeteringAgent.Opts)
config.register_agent_state_opts_helper(conf)
config.register_root_helper(conf)
conf(project='neutron')
config.setup_logging(conf)
server = neutron_service.Service.create(
binary='neutron-metering-agent',
topic=topics.METERING_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.services.metering.agents.'
'metering_agent.MeteringAgentWithStateReport')
service.launch(server).wait()

View File

@ -0,0 +1,15 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,49 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class MeteringAbstractDriver(object):
"""Abstract Metering driver."""
__metaclass__ = abc.ABCMeta
def __init__(self, plugin, conf):
pass
@abc.abstractmethod
def update_routers(self, context, routers):
pass
@abc.abstractmethod
def remove_router(self, context, router_id):
pass
@abc.abstractmethod
def update_metering_label_rules(self, context, routers):
pass
@abc.abstractmethod
def add_metering_label(self, context, routers):
pass
@abc.abstractmethod
def remove_metering_label(self, context, routers):
pass
@abc.abstractmethod
def get_traffic_counters(self, context, routers):
pass

View File

@ -0,0 +1,15 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,45 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import log
from neutron.services.metering.drivers import abstract_driver
class NoopMeteringDriver(abstract_driver.MeteringAbstractDriver):
@log.log
def update_routers(self, context, routers):
pass
@log.log
def remove_router(self, context, router_id):
pass
@log.log
def update_metering_label_rules(self, context, routers):
pass
@log.log
def add_metering_label(self, context, routers):
pass
@log.log
def remove_metering_label(self, context, routers):
pass
@log.log
def get_traffic_counters(self, context, routers):
pass

View File

@ -0,0 +1,130 @@
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo.config import cfg
from neutron.agent.common import config
from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common.notifier import test_notifier
from neutron.openstack.common import uuidutils
from neutron.services.metering.agents import metering_agent
from neutron.tests import base
_uuid = uuidutils.generate_uuid
TENANT_ID = _uuid()
LABEL_ID = _uuid()
ROUTERS = [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': TENANT_ID,
'_metering_labels': [{'rules': [],
'id': LABEL_ID}],
'id': _uuid()}]
class TestMeteringOperations(base.BaseTestCase):
def setUp(self):
super(TestMeteringOperations, self).setUp()
cfg.CONF.register_opts(metering_agent.MeteringAgent.Opts)
config.register_root_helper(cfg.CONF)
self.noop_driver = ('neutron.services.metering.drivers.noop.'
'noop_driver.NoopMeteringDriver')
cfg.CONF.set_override('driver', self.noop_driver)
cfg.CONF.set_override('measure_interval', 0)
cfg.CONF.set_override('report_interval', 0)
notifier_api._drivers = None
cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
metering_rpc = ('neutron.services.metering.agents.metering_agent.'
'MeteringPluginRpc._get_sync_data_metering')
self.metering_rpc_patch = mock.patch(metering_rpc, return_value=[])
self.metering_rpc_patch.start()
self.driver_patch = mock.patch(self.noop_driver, autospec=True)
self.driver_patch.start()
self.agent = metering_agent.MeteringAgent('my agent', cfg.CONF)
self.driver = self.agent.metering_driver
self.addCleanup(mock.patch.stopall)
def tearDown(self):
test_notifier.NOTIFICATIONS = []
super(TestMeteringOperations, self).tearDown()
def test_add_metering_label(self):
self.agent.add_metering_label(None, ROUTERS)
self.assertEqual(self.driver.add_metering_label.call_count, 1)
def test_remove_metering_label(self):
self.agent.remove_metering_label(None, ROUTERS)
self.assertEqual(self.driver.remove_metering_label.call_count, 1)
def test_update_metering_label_rule(self):
self.agent.update_metering_label_rules(None, ROUTERS)
self.assertEqual(self.driver.update_metering_label_rules.call_count, 1)
def test_routers_updated(self):
self.agent.routers_updated(None, ROUTERS)
self.assertEqual(self.driver.update_routers.call_count, 1)
def test_get_traffic_counters(self):
self.agent._get_traffic_counters(None, ROUTERS)
self.assertEqual(self.driver.get_traffic_counters.call_count, 1)
def test_notification_report(self):
self.agent.routers_updated(None, ROUTERS)
self.driver.get_traffic_counters.return_value = {LABEL_ID:
{'pkts': 88,
'bytes': 444}}
self.agent._metering_loop()
self.assertNotEqual(len(test_notifier.NOTIFICATIONS), 0)
for n in test_notifier.NOTIFICATIONS:
if n['event_type'] == 'l3.meter':
break
self.assertEqual(n['event_type'], 'l3.meter')
payload = n['payload']
self.assertEqual(payload['tenant_id'], TENANT_ID)
self.assertEqual(payload['label_id'], LABEL_ID)
self.assertEqual(payload['pkts'], 88)
self.assertEqual(payload['bytes'], 444)
def test_router_deleted(self):
label_id = _uuid()
self.driver.get_traffic_counters = mock.MagicMock()
self.driver.get_traffic_counters.return_value = {label_id:
{'pkts': 44,
'bytes': 222}}
self.agent._add_metering_info = mock.MagicMock()
self.agent.routers_updated(None, ROUTERS)
self.agent.router_deleted(None, ROUTERS[0]['id'])
self.assertEqual(self.agent._add_metering_info.call_count, 1)
self.assertEqual(self.driver.remove_router.call_count, 1)
self.agent._add_metering_info.assert_called_with(label_id, 44, 222)

View File

@ -107,6 +107,7 @@ console_scripts =
quantum-server = neutron.server:main quantum-server = neutron.server:main
quantum-rootwrap = neutron.openstack.common.rootwrap.cmd:main quantum-rootwrap = neutron.openstack.common.rootwrap.cmd:main
quantum-usage-audit = neutron.cmd.usage_audit:main quantum-usage-audit = neutron.cmd.usage_audit:main
neutron-metering-agent = neutron.services.metering.agents.metering_agent:main
neutron.ml2.type_drivers = neutron.ml2.type_drivers =
flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver
local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver