NSX-V3| Qos without RPC notifications

The QoS implementation will stop using the RPC notifications in Pike.
See commit I2f166a43f0b980ad22617f8a3f7b4cc7f4786c48
This patch still supports it for backwards compatibility,
as well as a new driver to replace the RPC notifications.

Change-Id: I1f863bf91f712d4b12db753b13cc6b842b6918a4
This commit is contained in:
Adit Sarfaty 2017-01-24 18:18:26 +02:00
parent 5aca58fc08
commit 02f630b616
10 changed files with 135 additions and 74 deletions

View File

@ -105,7 +105,7 @@ Enable the qos in ``local.conf``::
service_plugins = neutron.services.qos.qos_plugin.QoSPlugin
[qos]
notification_drivers = vmware_nsxv3_message_queue
notification_drivers =
Optional: Update the nsx qos_peak_bw_multiplier in nsx.ini (default value is 2.0)::

View File

@ -373,11 +373,15 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
# Add QoS
qos_topic = resources_rpc.resource_type_versioned_topic(
callbacks_resources.QOS_POLICY)
self.conn.create_consumer(
qos_topic, [resources_rpc.ResourcesPushRpcCallback()],
fanout=False)
qos_plugin = directory.get_plugin(plugin_const.QOS)
if (qos_plugin and qos_plugin.driver_manager and
qos_plugin.driver_manager.rpc_notifications_required):
# TODO(asarfaty) this option should be deprecated on Pike
qos_topic = resources_rpc.resource_type_versioned_topic(
callbacks_resources.QOS_POLICY)
self.conn.create_consumer(
qos_topic, [resources_rpc.ResourcesPushRpcCallback()],
fanout=False)
self.start_rpc_listeners_called = True
return self.conn.consume_in_threads()

View File

@ -66,6 +66,7 @@ from neutron_lib.api.definitions import provider_net as pnet
from neutron_lib.api import validators
from neutron_lib import constants as const
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import directory
from neutron_lib.utils import helpers
from oslo_config import cfg
from oslo_db import exception as db_exc
@ -95,6 +96,7 @@ from vmware_nsx.extensions import securitygrouplogging as sg_logging
from vmware_nsx.plugins.nsx_v3 import cert_utils
from vmware_nsx.plugins.nsx_v3 import utils as v3_utils
from vmware_nsx.services.qos.common import utils as qos_com_utils
from vmware_nsx.services.qos.nsx_v3 import driver as qos_driver
from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils
from vmware_nsx.services.trunk.nsx_v3 import driver as trunk_driver
from vmware_nsxlib.v3 import client_cert
@ -228,9 +230,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
) % NSX_V3_EXCLUDED_PORT_NSGROUP_NAME
raise nsx_exc.NsxPluginException(err_msg=msg)
# Bind QoS notifications
callbacks_registry.register(qos_utils.handle_qos_notification,
callbacks_resources.QOS_POLICY)
self._init_qos_callbacks()
self.start_rpc_listeners_called = False
self._unsubscribe_callback_events()
@ -514,6 +515,20 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
cfg.CONF.nsx_v3.log_security_groups_blocked_traffic)
return section_id
def _init_qos_callbacks(self):
# Bind QoS notifications. the RPC option will be deprecated soon,
# but for now we need to support both options
qos_plugin = directory.get_plugin(plugin_const.QOS)
if (qos_plugin and qos_plugin.driver_manager and
qos_plugin.driver_manager.rpc_notifications_required):
# TODO(asarfaty) this option should be deprecated on Pike
self.qos_use_rpc = True
callbacks_registry.register(qos_utils.handle_qos_notification,
callbacks_resources.QOS_POLICY)
else:
self.qos_use_rpc = False
qos_driver.register()
def _init_dhcp_metadata(self):
if cfg.CONF.nsx_v3.native_dhcp_metadata:
if cfg.CONF.dhcp_agent_notification:
@ -578,11 +593,13 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
qos_topic = resources_rpc.resource_type_versioned_topic(
callbacks_resources.QOS_POLICY)
self.conn.create_consumer(qos_topic,
[resources_rpc.ResourcesPushRpcCallback()],
fanout=False)
if self.qos_use_rpc:
qos_topic = resources_rpc.resource_type_versioned_topic(
callbacks_resources.QOS_POLICY)
self.conn.create_consumer(
qos_topic,
[resources_rpc.ResourcesPushRpcCallback()],
fanout=False)
self.start_rpc_listeners_called = True
return self.conn.consume_in_threads()

View File

@ -0,0 +1,82 @@
# Copyright 2017 VMware, Inc.
#
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from neutron.services.qos.drivers import base
from neutron.services.qos import qos_consts
from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils
LOG = logging.getLogger(__name__)
DRIVER = None
SUPPORTED_RULES = [qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH]
class NSXv3QosDriver(base.DriverBase):
@staticmethod
def create():
return NSXv3QosDriver(
name='NSXv3QosDriver',
vif_types=None,
vnic_types=None,
supported_rules=SUPPORTED_RULES,
requires_rpc_notifications=False)
def __init__(self, **kwargs):
self.handler = qos_utils.QosNotificationsHandler()
super(NSXv3QosDriver, self).__init__(**kwargs)
def is_vif_type_compatible(self, vif_type):
return True
def is_vnic_compatible(self, vnic_type):
return True
def create_policy(self, context, policy):
self.handler.create_policy(context, policy)
def update_policy(self, context, policy):
if (hasattr(policy, "rules")):
# we may have up to 1 rule of each type
bw_rule = None
dscp_rule = None
for rule in policy["rules"]:
if rule.rule_type == qos_consts.RULE_TYPE_BANDWIDTH_LIMIT:
bw_rule = rule
else:
dscp_rule = rule
self.handler.update_policy_rules(
context, policy.id, bw_rule, dscp_rule)
# May also need to update name / description
self.handler.update_policy(context, policy.id, policy)
def delete_policy(self, context, policy):
self.handler.delete_policy(context, policy.id)
def register():
"""Register the NSX-V3 QoS driver."""
global DRIVER
if not DRIVER:
DRIVER = NSXv3QosDriver.create()
LOG.debug('NSXv3QosDriver QoS driver registered')

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import events
from neutron.services.qos.notification_drivers import message_queue
@ -24,6 +23,6 @@ class NsxV3QosNotificationDriver(
Overriding the create_policy method in order to add a notification
message in this case too.
"""
def create_policy(self, context, policy):
self.notification_api.push(context, [policy], events.CREATED)
# The message queue is no longer needed in Pike.
# Keeping this class for a while for existing configurations.
pass

View File

@ -34,6 +34,8 @@ MAX_KBPS_MIN_VALUE = 1024
MAX_BURST_MAX_VALUE = int((2 ** 31 - 1) / 128)
#TODO(asarfaty): QoS usage of RPC will be deprecated on Pike, and the driver
# code will be used instead. For now - we need to support both.
def handle_qos_notification(context, resource_type, policies_list,
event_type):
for policy_obj in policies_list:
@ -66,9 +68,9 @@ def handle_qos_policy_notification(context, policy_obj, event_type):
handler.update_policy_rules(
context, policy_obj.id, bw_rule, dscp_rule)
else:
# Without rules - need to update only name / description
handler.update_policy(context, policy_obj.id, policy)
# May also need to update name / description
handler.update_policy(context, policy_obj.id, policy)
elif (event_type == callbacks_events.DELETED):
handler.delete_policy(context, policy_obj.id)

View File

@ -94,6 +94,9 @@ class NsxV3PluginWrapper(plugin.NsxV3Plugin):
super(NsxV3PluginWrapper, self).__init__()
self.context = context.get_admin_context()
def _init_qos_callbacks(self):
self.qos_use_rpc = False
def _init_dhcp_metadata(self):
pass

View File

@ -1,38 +0,0 @@
# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.services.qos.notification_drivers import message_queue
from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils
class DummyNotificationDriver(
message_queue.RpcQosServiceNotificationDriver):
def create_policy(self, context, policy):
qos_utils.handle_qos_notification(
context, resources.QOS_POLICY,
[policy], events.CREATED)
def update_policy(self, context, policy):
qos_utils.handle_qos_notification(
context, resources.QOS_POLICY,
[policy], events.UPDATED)
def delete_policy(self, context, policy):
qos_utils.handle_qos_notification(
context, resources.QOS_POLICY,
[policy], events.DELETED)

View File

@ -26,6 +26,7 @@ from neutron.tests.unit.services.qos import base
from vmware_nsx.db import db as nsx_db
from vmware_nsx.plugins.nsx_v3 import utils as v3_utils
from vmware_nsx.services.qos.nsx_v3 import driver as qos_driver
from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils
from vmware_nsx.tests.unit.nsx_v3 import test_plugin
@ -36,16 +37,13 @@ class TestQosNsxV3Notification(base.BaseQosTestCase,
test_plugin.NsxV3PluginTestCaseMixin):
def setUp(self):
# Add a dummy notification driver - should be removed in Pike
cfg.CONF.set_override("notification_drivers", [], "qos")
# Reset the drive to re-create it
qos_driver.DRIVER = None
super(TestQosNsxV3Notification, self).setUp()
self.setup_coreplugin(PLUGIN_NAME)
# Add a dummy notification driver that calls our handler directly
# (to skip the message queue)
cfg.CONF.set_override(
"notification_drivers",
['vmware_nsx.tests.unit.services.qos.fake_notifier.'
'DummyNotificationDriver'],
"qos")
self.qos_plugin = qos_plugin.QoSPlugin()
self.ctxt = context.Context('fake_user', 'fake_tenant')
mock.patch.object(self.ctxt.session, 'refresh').start()

View File

@ -42,20 +42,14 @@ class TestQosNsxVNotification(test_plugin.NsxVPluginV2TestCase,
def setUp(self, *mocks):
# init the nsx-v plugin for testing with DVS
self._init_dvs_config()
# Add a dummy notification driver - should be removed in Pike
cfg.CONF.set_override("notification_drivers", [], "qos")
super(TestQosNsxVNotification, self).setUp(plugin=CORE_PLUGIN,
ext_mgr=None)
plugin_instance = directory.get_plugin()
self._core_plugin = plugin_instance
self._core_plugin.init_is_complete = True
# Setup the QoS plugin:
# Add a dummy notification driver that calls our handler directly
# (to skip the message queue)
cfg.CONF.set_override(
"notification_drivers",
['vmware_nsx.tests.unit.services.qos.fake_nsxv_notifier.'
'DummyNsxVNotificationDriver'],
"qos")
self.qos_plugin = qos_plugin.QoSPlugin()
mock.patch.object(qos_utils.NsxVQosRule,
'_get_qos_plugin',