From 02f630b6169a2991f8e66d611b4335bcbb714832 Mon Sep 17 00:00:00 2001 From: Adit Sarfaty Date: Tue, 24 Jan 2017 18:18:26 +0200 Subject: [PATCH] NSX-V3| Qos without RPC notifications The QoS implementation will stop using the RPC notifications in Pike. See commit I2f166a43f0b980ad22617f8a3f7b4cc7f4786c48 This patch still supports it for backwards compatibility, as well as a new driver to replace the RPC notifications. Change-Id: I1f863bf91f712d4b12db753b13cc6b842b6918a4 --- doc/source/devstack.rst | 2 +- vmware_nsx/plugins/nsx_v/plugin.py | 14 ++-- vmware_nsx/plugins/nsx_v3/plugin.py | 33 ++++++-- vmware_nsx/services/qos/nsx_v3/driver.py | 82 +++++++++++++++++++ .../services/qos/nsx_v3/message_queue.py | 7 +- vmware_nsx/services/qos/nsx_v3/utils.py | 8 +- .../admin/plugins/nsxv3/resources/utils.py | 3 + .../tests/unit/services/qos/fake_notifier.py | 38 --------- .../services/qos/test_nsxv3_notification.py | 12 ++- .../services/qos/test_nsxv_notification.py | 10 +-- 10 files changed, 135 insertions(+), 74 deletions(-) create mode 100644 vmware_nsx/services/qos/nsx_v3/driver.py delete mode 100644 vmware_nsx/tests/unit/services/qos/fake_notifier.py diff --git a/doc/source/devstack.rst b/doc/source/devstack.rst index 84777b46dc..9894ee25b8 100644 --- a/doc/source/devstack.rst +++ b/doc/source/devstack.rst @@ -105,7 +105,7 @@ Enable the qos in ``local.conf``:: service_plugins = neutron.services.qos.qos_plugin.QoSPlugin [qos] - notification_drivers = vmware_nsxv3_message_queue + notification_drivers = Optional: Update the nsx qos_peak_bw_multiplier in nsx.ini (default value is 2.0):: diff --git a/vmware_nsx/plugins/nsx_v/plugin.py b/vmware_nsx/plugins/nsx_v/plugin.py index 338c65c266..3465791572 100644 --- a/vmware_nsx/plugins/nsx_v/plugin.py +++ b/vmware_nsx/plugins/nsx_v/plugin.py @@ -373,11 +373,15 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, self.conn.create_consumer(self.topic, self.endpoints, fanout=False) # Add QoS - qos_topic = resources_rpc.resource_type_versioned_topic( - callbacks_resources.QOS_POLICY) - self.conn.create_consumer( - qos_topic, [resources_rpc.ResourcesPushRpcCallback()], - fanout=False) + qos_plugin = directory.get_plugin(plugin_const.QOS) + if (qos_plugin and qos_plugin.driver_manager and + qos_plugin.driver_manager.rpc_notifications_required): + # TODO(asarfaty) this option should be deprecated on Pike + qos_topic = resources_rpc.resource_type_versioned_topic( + callbacks_resources.QOS_POLICY) + self.conn.create_consumer( + qos_topic, [resources_rpc.ResourcesPushRpcCallback()], + fanout=False) self.start_rpc_listeners_called = True return self.conn.consume_in_threads() diff --git a/vmware_nsx/plugins/nsx_v3/plugin.py b/vmware_nsx/plugins/nsx_v3/plugin.py index 53351c22c6..0cc81856d2 100644 --- a/vmware_nsx/plugins/nsx_v3/plugin.py +++ b/vmware_nsx/plugins/nsx_v3/plugin.py @@ -66,6 +66,7 @@ from neutron_lib.api.definitions import provider_net as pnet from neutron_lib.api import validators from neutron_lib import constants as const from neutron_lib import exceptions as n_exc +from neutron_lib.plugins import directory from neutron_lib.utils import helpers from oslo_config import cfg from oslo_db import exception as db_exc @@ -95,6 +96,7 @@ from vmware_nsx.extensions import securitygrouplogging as sg_logging from vmware_nsx.plugins.nsx_v3 import cert_utils from vmware_nsx.plugins.nsx_v3 import utils as v3_utils from vmware_nsx.services.qos.common import utils as qos_com_utils +from vmware_nsx.services.qos.nsx_v3 import driver as qos_driver from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils from vmware_nsx.services.trunk.nsx_v3 import driver as trunk_driver from vmware_nsxlib.v3 import client_cert @@ -228,9 +230,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, ) % NSX_V3_EXCLUDED_PORT_NSGROUP_NAME raise nsx_exc.NsxPluginException(err_msg=msg) - # Bind QoS notifications - callbacks_registry.register(qos_utils.handle_qos_notification, - callbacks_resources.QOS_POLICY) + self._init_qos_callbacks() + self.start_rpc_listeners_called = False self._unsubscribe_callback_events() @@ -514,6 +515,20 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, cfg.CONF.nsx_v3.log_security_groups_blocked_traffic) return section_id + def _init_qos_callbacks(self): + # Bind QoS notifications. the RPC option will be deprecated soon, + # but for now we need to support both options + qos_plugin = directory.get_plugin(plugin_const.QOS) + if (qos_plugin and qos_plugin.driver_manager and + qos_plugin.driver_manager.rpc_notifications_required): + # TODO(asarfaty) this option should be deprecated on Pike + self.qos_use_rpc = True + callbacks_registry.register(qos_utils.handle_qos_notification, + callbacks_resources.QOS_POLICY) + else: + self.qos_use_rpc = False + qos_driver.register() + def _init_dhcp_metadata(self): if cfg.CONF.nsx_v3.native_dhcp_metadata: if cfg.CONF.dhcp_agent_notification: @@ -578,11 +593,13 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, self.conn.create_consumer(topics.REPORTS, [agents_db.AgentExtRpcCallback()], fanout=False) - qos_topic = resources_rpc.resource_type_versioned_topic( - callbacks_resources.QOS_POLICY) - self.conn.create_consumer(qos_topic, - [resources_rpc.ResourcesPushRpcCallback()], - fanout=False) + if self.qos_use_rpc: + qos_topic = resources_rpc.resource_type_versioned_topic( + callbacks_resources.QOS_POLICY) + self.conn.create_consumer( + qos_topic, + [resources_rpc.ResourcesPushRpcCallback()], + fanout=False) self.start_rpc_listeners_called = True return self.conn.consume_in_threads() diff --git a/vmware_nsx/services/qos/nsx_v3/driver.py b/vmware_nsx/services/qos/nsx_v3/driver.py new file mode 100644 index 0000000000..715411be70 --- /dev/null +++ b/vmware_nsx/services/qos/nsx_v3/driver.py @@ -0,0 +1,82 @@ +# Copyright 2017 VMware, Inc. +# +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from neutron.services.qos.drivers import base +from neutron.services.qos import qos_consts + +from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils + +LOG = logging.getLogger(__name__) + +DRIVER = None + +SUPPORTED_RULES = [qos_consts.RULE_TYPE_BANDWIDTH_LIMIT, + qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH] + + +class NSXv3QosDriver(base.DriverBase): + + @staticmethod + def create(): + return NSXv3QosDriver( + name='NSXv3QosDriver', + vif_types=None, + vnic_types=None, + supported_rules=SUPPORTED_RULES, + requires_rpc_notifications=False) + + def __init__(self, **kwargs): + self.handler = qos_utils.QosNotificationsHandler() + super(NSXv3QosDriver, self).__init__(**kwargs) + + def is_vif_type_compatible(self, vif_type): + return True + + def is_vnic_compatible(self, vnic_type): + return True + + def create_policy(self, context, policy): + self.handler.create_policy(context, policy) + + def update_policy(self, context, policy): + if (hasattr(policy, "rules")): + # we may have up to 1 rule of each type + bw_rule = None + dscp_rule = None + for rule in policy["rules"]: + if rule.rule_type == qos_consts.RULE_TYPE_BANDWIDTH_LIMIT: + bw_rule = rule + else: + dscp_rule = rule + + self.handler.update_policy_rules( + context, policy.id, bw_rule, dscp_rule) + + # May also need to update name / description + self.handler.update_policy(context, policy.id, policy) + + def delete_policy(self, context, policy): + self.handler.delete_policy(context, policy.id) + + +def register(): + """Register the NSX-V3 QoS driver.""" + global DRIVER + if not DRIVER: + DRIVER = NSXv3QosDriver.create() + LOG.debug('NSXv3QosDriver QoS driver registered') diff --git a/vmware_nsx/services/qos/nsx_v3/message_queue.py b/vmware_nsx/services/qos/nsx_v3/message_queue.py index 908ff7c13d..af19b90075 100644 --- a/vmware_nsx/services/qos/nsx_v3/message_queue.py +++ b/vmware_nsx/services/qos/nsx_v3/message_queue.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -from neutron.api.rpc.callbacks import events from neutron.services.qos.notification_drivers import message_queue @@ -24,6 +23,6 @@ class NsxV3QosNotificationDriver( Overriding the create_policy method in order to add a notification message in this case too. """ - - def create_policy(self, context, policy): - self.notification_api.push(context, [policy], events.CREATED) + # The message queue is no longer needed in Pike. + # Keeping this class for a while for existing configurations. + pass diff --git a/vmware_nsx/services/qos/nsx_v3/utils.py b/vmware_nsx/services/qos/nsx_v3/utils.py index 1b950f9d37..9124e3d4a6 100644 --- a/vmware_nsx/services/qos/nsx_v3/utils.py +++ b/vmware_nsx/services/qos/nsx_v3/utils.py @@ -34,6 +34,8 @@ MAX_KBPS_MIN_VALUE = 1024 MAX_BURST_MAX_VALUE = int((2 ** 31 - 1) / 128) +#TODO(asarfaty): QoS usage of RPC will be deprecated on Pike, and the driver +# code will be used instead. For now - we need to support both. def handle_qos_notification(context, resource_type, policies_list, event_type): for policy_obj in policies_list: @@ -66,9 +68,9 @@ def handle_qos_policy_notification(context, policy_obj, event_type): handler.update_policy_rules( context, policy_obj.id, bw_rule, dscp_rule) - else: - # Without rules - need to update only name / description - handler.update_policy(context, policy_obj.id, policy) + + # May also need to update name / description + handler.update_policy(context, policy_obj.id, policy) elif (event_type == callbacks_events.DELETED): handler.delete_policy(context, policy_obj.id) diff --git a/vmware_nsx/shell/admin/plugins/nsxv3/resources/utils.py b/vmware_nsx/shell/admin/plugins/nsxv3/resources/utils.py index 6f2f1bf93f..f361c42a54 100644 --- a/vmware_nsx/shell/admin/plugins/nsxv3/resources/utils.py +++ b/vmware_nsx/shell/admin/plugins/nsxv3/resources/utils.py @@ -94,6 +94,9 @@ class NsxV3PluginWrapper(plugin.NsxV3Plugin): super(NsxV3PluginWrapper, self).__init__() self.context = context.get_admin_context() + def _init_qos_callbacks(self): + self.qos_use_rpc = False + def _init_dhcp_metadata(self): pass diff --git a/vmware_nsx/tests/unit/services/qos/fake_notifier.py b/vmware_nsx/tests/unit/services/qos/fake_notifier.py deleted file mode 100644 index e6038336b1..0000000000 --- a/vmware_nsx/tests/unit/services/qos/fake_notifier.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2016 VMware, Inc. -# All Rights Reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.api.rpc.callbacks import events -from neutron.api.rpc.callbacks import resources -from neutron.services.qos.notification_drivers import message_queue -from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils - - -class DummyNotificationDriver( - message_queue.RpcQosServiceNotificationDriver): - - def create_policy(self, context, policy): - qos_utils.handle_qos_notification( - context, resources.QOS_POLICY, - [policy], events.CREATED) - - def update_policy(self, context, policy): - qos_utils.handle_qos_notification( - context, resources.QOS_POLICY, - [policy], events.UPDATED) - - def delete_policy(self, context, policy): - qos_utils.handle_qos_notification( - context, resources.QOS_POLICY, - [policy], events.DELETED) diff --git a/vmware_nsx/tests/unit/services/qos/test_nsxv3_notification.py b/vmware_nsx/tests/unit/services/qos/test_nsxv3_notification.py index 6e3ad656ed..e9e823fe35 100644 --- a/vmware_nsx/tests/unit/services/qos/test_nsxv3_notification.py +++ b/vmware_nsx/tests/unit/services/qos/test_nsxv3_notification.py @@ -26,6 +26,7 @@ from neutron.tests.unit.services.qos import base from vmware_nsx.db import db as nsx_db from vmware_nsx.plugins.nsx_v3 import utils as v3_utils +from vmware_nsx.services.qos.nsx_v3 import driver as qos_driver from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils from vmware_nsx.tests.unit.nsx_v3 import test_plugin @@ -36,16 +37,13 @@ class TestQosNsxV3Notification(base.BaseQosTestCase, test_plugin.NsxV3PluginTestCaseMixin): def setUp(self): + # Add a dummy notification driver - should be removed in Pike + cfg.CONF.set_override("notification_drivers", [], "qos") + # Reset the drive to re-create it + qos_driver.DRIVER = None super(TestQosNsxV3Notification, self).setUp() self.setup_coreplugin(PLUGIN_NAME) - # Add a dummy notification driver that calls our handler directly - # (to skip the message queue) - cfg.CONF.set_override( - "notification_drivers", - ['vmware_nsx.tests.unit.services.qos.fake_notifier.' - 'DummyNotificationDriver'], - "qos") self.qos_plugin = qos_plugin.QoSPlugin() self.ctxt = context.Context('fake_user', 'fake_tenant') mock.patch.object(self.ctxt.session, 'refresh').start() diff --git a/vmware_nsx/tests/unit/services/qos/test_nsxv_notification.py b/vmware_nsx/tests/unit/services/qos/test_nsxv_notification.py index e072421d34..d551d06d44 100644 --- a/vmware_nsx/tests/unit/services/qos/test_nsxv_notification.py +++ b/vmware_nsx/tests/unit/services/qos/test_nsxv_notification.py @@ -42,20 +42,14 @@ class TestQosNsxVNotification(test_plugin.NsxVPluginV2TestCase, def setUp(self, *mocks): # init the nsx-v plugin for testing with DVS self._init_dvs_config() + # Add a dummy notification driver - should be removed in Pike + cfg.CONF.set_override("notification_drivers", [], "qos") super(TestQosNsxVNotification, self).setUp(plugin=CORE_PLUGIN, ext_mgr=None) plugin_instance = directory.get_plugin() self._core_plugin = plugin_instance self._core_plugin.init_is_complete = True - # Setup the QoS plugin: - # Add a dummy notification driver that calls our handler directly - # (to skip the message queue) - cfg.CONF.set_override( - "notification_drivers", - ['vmware_nsx.tests.unit.services.qos.fake_nsxv_notifier.' - 'DummyNsxVNotificationDriver'], - "qos") self.qos_plugin = qos_plugin.QoSPlugin() mock.patch.object(qos_utils.NsxVQosRule, '_get_qos_plugin',