diff --git a/etc/metering_agent.ini b/etc/metering_agent.ini new file mode 100644 index 0000000000..60aadc99a5 --- /dev/null +++ b/etc/metering_agent.ini @@ -0,0 +1,12 @@ +[DEFAULT] +# Show debugging output in log (sets DEBUG log level output) +# debug = True + +# driver = neutron.services.metering.drivers.iptables.iptables_driver.IptablesMeteringDriver + +# Interval between two metering measures +# measure_interval = 30 + +# Interval between two metering reports +# report_interval = 300 + diff --git a/neutron/common/constants.py b/neutron/common/constants.py index a41fe8553a..456b688b4a 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -71,6 +71,7 @@ AGENT_TYPE_NEC = 'NEC plugin agent' AGENT_TYPE_L3 = 'L3 agent' AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent' AGENT_TYPE_MLNX = 'Mellanox plugin agent' +AGENT_TYPE_METERING = 'Metering agent' L2_AGENT_TOPIC = 'N/A' PAGINATION_INFINITE = 'infinite' diff --git a/neutron/common/topics.py b/neutron/common/topics.py index 26d5fec7ee..9822aed750 100644 --- a/neutron/common/topics.py +++ b/neutron/common/topics.py @@ -31,7 +31,6 @@ METERING_PLUGIN = 'q-metering-plugin' L3_AGENT = 'l3_agent' DHCP_AGENT = 'dhcp_agent' METERING_AGENT = 'metering_agent' -METERING_PLUGIN = 'metering_plugin' def get_topic_name(prefix, table, operation): diff --git a/neutron/services/metering/agents/__init__.py b/neutron/services/metering/agents/__init__.py new file mode 100644 index 0000000000..82a4472130 --- /dev/null +++ b/neutron/services/metering/agents/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2013 eNovance SAS +# +# Author: Sylvain Afchain +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/neutron/services/metering/agents/metering_agent.py b/neutron/services/metering/agents/metering_agent.py new file mode 100644 index 0000000000..a61a8eb38d --- /dev/null +++ b/neutron/services/metering/agents/metering_agent.py @@ -0,0 +1,293 @@ +# Copyright (C) 2013 eNovance SAS +# +# Author: Sylvain Afchain +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import eventlet +from oslo.config import cfg + +from neutron.agent.common import config +from neutron.agent import rpc as agent_rpc +from neutron.common import constants as constants +from neutron.common import topics +from neutron.common import utils +from neutron import context +from neutron import manager +from neutron.openstack.common import importutils +from neutron.openstack.common import log as logging +from neutron.openstack.common import loopingcall +from neutron.openstack.common.notifier import api as notifier_api +from neutron.openstack.common import periodic_task +from neutron.openstack.common.rpc import proxy +from neutron.openstack.common import service +from neutron import service as neutron_service + + +LOG = logging.getLogger(__name__) + + +class MeteringPluginRpc(proxy.RpcProxy): + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, host): + super(MeteringPluginRpc, + self).__init__(topic=topics.METERING_AGENT, + default_version=self.BASE_RPC_API_VERSION) + + def _get_sync_data_metering(self, context): + try: + return self.call(context, + self.make_msg('get_sync_data_metering', + host=self.host), + topic=topics.METERING_PLUGIN) + except Exception: + LOG.exception(_("Failed synchronizing routers")) + + +class MeteringAgent(MeteringPluginRpc, manager.Manager): + + Opts = [ + cfg.StrOpt('driver', + default='neutron.services.metering.drivers.noop.' + 'noop_driver.NoopMeteringDriver', + help=_("Metering driver")), + cfg.IntOpt('measure_interval', default=30, + help=_("Interval between two metering measures")), + cfg.IntOpt('report_interval', default=300, + help=_("Interval between two metering reports")), + ] + + def __init__(self, host, conf=None): + self.conf = conf or cfg.CONF + self._load_drivers() + self.root_helper = config.get_root_helper(self.conf) + self.context = context.get_admin_context_without_session() + self.metering_info = {} + self.metering_loop = loopingcall.FixedIntervalLoopingCall( + self._metering_loop + ) + measure_interval = self.conf.measure_interval + self.last_report = 0 + self.metering_loop.start(interval=measure_interval) + self.host = host + + self.label_tenant_id = {} + self.routers = {} + self.metering_infos = {} + super(MeteringAgent, self).__init__(host=self.conf.host) + + def _load_drivers(self): + """Loads plugin-driver from configuration.""" + LOG.info(_("Loading Metering driver %s") % self.conf.driver) + if not self.conf.driver: + raise SystemExit(_('A metering driver must be specified')) + self.metering_driver = importutils.import_object( + self.conf.driver, self, self.conf) + + def _metering_notification(self): + for label_id, info in self.metering_infos.items(): + data = {'label_id': label_id, + 'tenant_id': self.label_tenant_id.get(label_id), + 'pkts': info['pkts'], + 'bytes': info['bytes'], + 'time': info['time'], + 'first_update': info['first_update'], + 'last_update': info['last_update'], + 'host': self.host} + + LOG.debug("Send metering report: %s" % data) + notifier_api.notify(self.context, + notifier_api.publisher_id('metering'), + 'l3.meter', + notifier_api.CONF.default_notification_level, + data) + info['pkts'] = 0 + info['bytes'] = 0 + info['time'] = 0 + + def _purge_metering_info(self): + ts = int(time.time()) + report_interval = self.conf.report_interval + for label_id, info in self.metering_info.items(): + if info['last_update'] > ts + report_interval: + del self.metering_info[label_id] + + def _add_metering_info(self, label_id, pkts, bytes): + ts = int(time.time()) + info = self.metering_infos.get(label_id, {'bytes': 0, + 'pkts': 0, + 'time': 0, + 'first_update': ts, + 'last_update': ts}) + info['bytes'] += bytes + info['pkts'] += pkts + info['time'] += ts - info['last_update'] + info['last_update'] = ts + + self.metering_infos[label_id] = info + + return info + + def _add_metering_infos(self): + self.label_tenant_id = {} + for router in self.routers.values(): + tenant_id = router['tenant_id'] + labels = router.get(constants.METERING_LABEL_KEY, []) + for label in labels: + label_id = label['id'] + self.label_tenant_id[label_id] = tenant_id + + tenant_id = self.label_tenant_id.get + accs = self._get_traffic_counters(self.context, self.routers.values()) + if not accs: + return + + for label_id, acc in accs.items(): + self._add_metering_info(label_id, acc['pkts'], acc['bytes']) + + def _metering_loop(self): + self._add_metering_infos() + + ts = int(time.time()) + delta = ts - self.last_report + + report_interval = self.conf.report_interval + if delta > report_interval: + self._metering_notification() + self._purge_metering_info() + self.last_report = ts + + @utils.synchronized('metering-agent') + def _invoke_driver(self, context, meterings, func_name): + try: + return getattr(self.metering_driver, func_name)(context, meterings) + except RuntimeError: + LOG.exception(_("Driver %(driver)s does not implement %(func)s"), + {'driver': cfg.CONF.metering_driver, + 'func': func_name}) + + @periodic_task.periodic_task(run_immediately=True) + def _sync_routers_task(self, context): + routers = self._get_sync_data_metering(self.context) + if not routers: + return + self._update_routers(context, routers) + + def router_deleted(self, context, router_id): + self._add_metering_infos() + + if router_id in self.routers: + del self.routers[router_id] + + return self._invoke_driver(context, router_id, + 'remove_router') + + def routers_updated(self, context, routers=None): + if not routers: + routers = self._get_sync_data_metering(self.context) + if not routers: + return + self._update_routers(context, routers) + + def _update_routers(self, context, routers): + for router in routers: + self.routers[router['id']] = router + + return self._invoke_driver(context, routers, + 'update_routers') + + def _get_traffic_counters(self, context, routers): + LOG.debug(_("Get router traffic counters")) + return self._invoke_driver(context, routers, 'get_traffic_counters') + + def update_metering_label_rules(self, context, routers): + LOG.debug(_("Update metering rules from agent")) + return self._invoke_driver(context, routers, + 'update_metering_label_rules') + + def add_metering_label(self, context, routers): + LOG.debug(_("Creating a metering label from agent")) + return self._invoke_driver(context, routers, + 'add_metering_label') + + def remove_metering_label(self, context, routers): + self._add_metering_infos() + + LOG.debug(_("Delete a metering label from agent")) + return self._invoke_driver(context, routers, + 'remove_metering_label') + + +class MeteringAgentWithStateReport(MeteringAgent): + + def __init__(self, host, conf=None): + super(MeteringAgentWithStateReport, self).__init__(host=host, + conf=conf) + self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) + self.agent_state = { + 'binary': 'neutron-metering-agent', + 'host': host, + 'topic': topics.METERING_AGENT, + 'configurations': { + 'metering_driver': self.conf.driver, + 'measure_interval': + self.conf.measure_interval, + 'report_interval': self.conf.report_interval + }, + 'start_flag': True, + 'agent_type': constants.AGENT_TYPE_METERING} + report_interval = cfg.CONF.AGENT.report_interval + self.use_call = True + if report_interval: + self.heartbeat = loopingcall.FixedIntervalLoopingCall( + self._report_state) + self.heartbeat.start(interval=report_interval) + + def _report_state(self): + try: + self.state_rpc.report_state(self.context, self.agent_state, + self.use_call) + self.agent_state.pop('start_flag', None) + self.use_call = False + except AttributeError: + # This means the server does not support report_state + LOG.warn(_("Neutron server does not support state report." + " State report for this agent will be disabled.")) + self.heartbeat.stop() + return + except Exception: + LOG.exception(_("Failed reporting state!")) + + def agent_updated(self, context, payload): + LOG.info(_("agent_updated by server side %s!"), payload) + + +def main(): + eventlet.monkey_patch() + conf = cfg.CONF + conf.register_opts(MeteringAgent.Opts) + config.register_agent_state_opts_helper(conf) + config.register_root_helper(conf) + conf(project='neutron') + config.setup_logging(conf) + server = neutron_service.Service.create( + binary='neutron-metering-agent', + topic=topics.METERING_AGENT, + report_interval=cfg.CONF.AGENT.report_interval, + manager='neutron.services.metering.agents.' + 'metering_agent.MeteringAgentWithStateReport') + service.launch(server).wait() diff --git a/neutron/services/metering/drivers/__init__.py b/neutron/services/metering/drivers/__init__.py new file mode 100644 index 0000000000..82a4472130 --- /dev/null +++ b/neutron/services/metering/drivers/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2013 eNovance SAS +# +# Author: Sylvain Afchain +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/neutron/services/metering/drivers/abstract_driver.py b/neutron/services/metering/drivers/abstract_driver.py new file mode 100644 index 0000000000..a5e5ca193c --- /dev/null +++ b/neutron/services/metering/drivers/abstract_driver.py @@ -0,0 +1,49 @@ +# Copyright (C) 2013 eNovance SAS +# +# Author: Sylvain Afchain +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + + +class MeteringAbstractDriver(object): + """Abstract Metering driver.""" + __metaclass__ = abc.ABCMeta + + def __init__(self, plugin, conf): + pass + + @abc.abstractmethod + def update_routers(self, context, routers): + pass + + @abc.abstractmethod + def remove_router(self, context, router_id): + pass + + @abc.abstractmethod + def update_metering_label_rules(self, context, routers): + pass + + @abc.abstractmethod + def add_metering_label(self, context, routers): + pass + + @abc.abstractmethod + def remove_metering_label(self, context, routers): + pass + + @abc.abstractmethod + def get_traffic_counters(self, context, routers): + pass diff --git a/neutron/services/metering/drivers/noop/__init__.py b/neutron/services/metering/drivers/noop/__init__.py new file mode 100644 index 0000000000..82a4472130 --- /dev/null +++ b/neutron/services/metering/drivers/noop/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2013 eNovance SAS +# +# Author: Sylvain Afchain +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/neutron/services/metering/drivers/noop/noop_driver.py b/neutron/services/metering/drivers/noop/noop_driver.py new file mode 100644 index 0000000000..d3f5e7df45 --- /dev/null +++ b/neutron/services/metering/drivers/noop/noop_driver.py @@ -0,0 +1,45 @@ +# Copyright (C) 2013 eNovance SAS +# +# Author: Sylvain Afchain +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.common import log +from neutron.services.metering.drivers import abstract_driver + + +class NoopMeteringDriver(abstract_driver.MeteringAbstractDriver): + + @log.log + def update_routers(self, context, routers): + pass + + @log.log + def remove_router(self, context, router_id): + pass + + @log.log + def update_metering_label_rules(self, context, routers): + pass + + @log.log + def add_metering_label(self, context, routers): + pass + + @log.log + def remove_metering_label(self, context, routers): + pass + + @log.log + def get_traffic_counters(self, context, routers): + pass diff --git a/neutron/tests/unit/services/metering/test_metering_agent.py b/neutron/tests/unit/services/metering/test_metering_agent.py new file mode 100644 index 0000000000..20c6bbd031 --- /dev/null +++ b/neutron/tests/unit/services/metering/test_metering_agent.py @@ -0,0 +1,130 @@ +# Copyright (C) 2013 eNovance SAS +# +# Author: Sylvain Afchain +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslo.config import cfg + +from neutron.agent.common import config +from neutron.openstack.common.notifier import api as notifier_api +from neutron.openstack.common.notifier import test_notifier +from neutron.openstack.common import uuidutils +from neutron.services.metering.agents import metering_agent +from neutron.tests import base + + +_uuid = uuidutils.generate_uuid + +TENANT_ID = _uuid() +LABEL_ID = _uuid() +ROUTERS = [{'status': 'ACTIVE', + 'name': 'router1', + 'gw_port_id': None, + 'admin_state_up': True, + 'tenant_id': TENANT_ID, + '_metering_labels': [{'rules': [], + 'id': LABEL_ID}], + 'id': _uuid()}] + + +class TestMeteringOperations(base.BaseTestCase): + + def setUp(self): + super(TestMeteringOperations, self).setUp() + cfg.CONF.register_opts(metering_agent.MeteringAgent.Opts) + config.register_root_helper(cfg.CONF) + + self.noop_driver = ('neutron.services.metering.drivers.noop.' + 'noop_driver.NoopMeteringDriver') + cfg.CONF.set_override('driver', self.noop_driver) + cfg.CONF.set_override('measure_interval', 0) + cfg.CONF.set_override('report_interval', 0) + + notifier_api._drivers = None + cfg.CONF.set_override("notification_driver", [test_notifier.__name__]) + + metering_rpc = ('neutron.services.metering.agents.metering_agent.' + 'MeteringPluginRpc._get_sync_data_metering') + self.metering_rpc_patch = mock.patch(metering_rpc, return_value=[]) + self.metering_rpc_patch.start() + + self.driver_patch = mock.patch(self.noop_driver, autospec=True) + self.driver_patch.start() + + self.agent = metering_agent.MeteringAgent('my agent', cfg.CONF) + self.driver = self.agent.metering_driver + + self.addCleanup(mock.patch.stopall) + + def tearDown(self): + test_notifier.NOTIFICATIONS = [] + super(TestMeteringOperations, self).tearDown() + + def test_add_metering_label(self): + self.agent.add_metering_label(None, ROUTERS) + self.assertEqual(self.driver.add_metering_label.call_count, 1) + + def test_remove_metering_label(self): + self.agent.remove_metering_label(None, ROUTERS) + self.assertEqual(self.driver.remove_metering_label.call_count, 1) + + def test_update_metering_label_rule(self): + self.agent.update_metering_label_rules(None, ROUTERS) + self.assertEqual(self.driver.update_metering_label_rules.call_count, 1) + + def test_routers_updated(self): + self.agent.routers_updated(None, ROUTERS) + self.assertEqual(self.driver.update_routers.call_count, 1) + + def test_get_traffic_counters(self): + self.agent._get_traffic_counters(None, ROUTERS) + self.assertEqual(self.driver.get_traffic_counters.call_count, 1) + + def test_notification_report(self): + self.agent.routers_updated(None, ROUTERS) + + self.driver.get_traffic_counters.return_value = {LABEL_ID: + {'pkts': 88, + 'bytes': 444}} + self.agent._metering_loop() + + self.assertNotEqual(len(test_notifier.NOTIFICATIONS), 0) + for n in test_notifier.NOTIFICATIONS: + if n['event_type'] == 'l3.meter': + break + + self.assertEqual(n['event_type'], 'l3.meter') + + payload = n['payload'] + self.assertEqual(payload['tenant_id'], TENANT_ID) + self.assertEqual(payload['label_id'], LABEL_ID) + self.assertEqual(payload['pkts'], 88) + self.assertEqual(payload['bytes'], 444) + + def test_router_deleted(self): + label_id = _uuid() + self.driver.get_traffic_counters = mock.MagicMock() + self.driver.get_traffic_counters.return_value = {label_id: + {'pkts': 44, + 'bytes': 222}} + self.agent._add_metering_info = mock.MagicMock() + + self.agent.routers_updated(None, ROUTERS) + self.agent.router_deleted(None, ROUTERS[0]['id']) + + self.assertEqual(self.agent._add_metering_info.call_count, 1) + self.assertEqual(self.driver.remove_router.call_count, 1) + + self.agent._add_metering_info.assert_called_with(label_id, 44, 222) diff --git a/setup.cfg b/setup.cfg index 227d17e965..8ab7bcf317 100644 --- a/setup.cfg +++ b/setup.cfg @@ -107,6 +107,7 @@ console_scripts = quantum-server = neutron.server:main quantum-rootwrap = neutron.openstack.common.rootwrap.cmd:main quantum-usage-audit = neutron.cmd.usage_audit:main + neutron-metering-agent = neutron.services.metering.agents.metering_agent:main neutron.ml2.type_drivers = flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver