From 6fbf7ff64c70e93e3b11ee80681d786ea071c6b7 Mon Sep 17 00:00:00 2001 From: Adit Sarfaty Date: Wed, 23 Mar 2016 11:56:44 +0200 Subject: [PATCH] NSX|V3 add qos support for ports Add support for the qos service in NSX|V3, including: - Attach/Detach qos policy to a new or updated port - Allow qos configuration on ports with internal networks only - Update the switch profile with the BW limitations and tags through the qos notification driver - Add a new mapping db table to link the policy id and the nsx-v3 switch profile id For this to work, the following configuration should appear under the 'qos' section in the neutron.conf: notification_drivers = vmware_nsxv3_message_queue Change-Id: I4016de756cebe0032e61d3c2a5250527e44b49e4 --- setup.cfg | 2 + vmware_nsx/common/exceptions.py | 9 + vmware_nsx/db/db.py | 19 ++ .../alembic_migrations/versions/EXPAND_HEAD | 2 +- .../b7f41687cbad_nsxv3_qos_policy_mapping.py | 37 ++++ vmware_nsx/db/nsx_models.py | 10 + vmware_nsx/nsxlib/v3/__init__.py | 80 +++++++- vmware_nsx/plugins/nsx_v3/plugin.py | 155 +++++++++++++-- .../services/qos/nsx_v3/message_queue.py | 29 +++ vmware_nsx/services/qos/nsx_v3/utils.py | 181 +++++++++++++++++ vmware_nsx/tests/unit/nsx_v3/test_plugin.py | 53 +++++ .../nsxlib/v3/test_qos_switching_profile.py | 114 ++++++++++- .../tests/unit/services/qos/fake_notifier.py | 31 +++ .../services/qos/test_nsxv3_notification.py | 182 ++++++++++++++++++ 14 files changed, 877 insertions(+), 27 deletions(-) create mode 100644 vmware_nsx/db/migration/alembic_migrations/versions/newton/expand/b7f41687cbad_nsxv3_qos_policy_mapping.py create mode 100644 vmware_nsx/services/qos/nsx_v3/message_queue.py create mode 100644 vmware_nsx/services/qos/nsx_v3/utils.py create mode 100644 vmware_nsx/tests/unit/services/qos/fake_notifier.py create mode 100644 vmware_nsx/tests/unit/services/qos/test_nsxv3_notification.py diff --git a/setup.cfg b/setup.cfg index 008f9b2144..e00e419427 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,6 +33,8 @@ neutron.core_plugins = neutron.service_plugins = vmware_nsx_l2gw = vmware_nsx.services.l2gateway.common.plugin:NsxL2GatewayPlugin vmware_nsxv_qos = vmware_nsx.services.qos.nsx_v.plugin:NsxVQosPlugin +neutron.qos.notification_drivers = + vmware_nsxv3_message_queue = vmware_nsx.services.qos.nsx_v3.message_queue:NsxV3QosNotificationDriver vmware_nsx.neutron.nsxv.router_type_drivers = shared = vmware_nsx.plugins.nsx_v.drivers.shared_router_driver:RouterSharedDriver distributed = vmware_nsx.plugins.nsx_v.drivers.distributed_router_driver:RouterDistributedDriver diff --git a/vmware_nsx/common/exceptions.py b/vmware_nsx/common/exceptions.py index e210474fbd..8748d0f763 100644 --- a/vmware_nsx/common/exceptions.py +++ b/vmware_nsx/common/exceptions.py @@ -171,3 +171,12 @@ class SecurityGroupMaximumCapacityReached(NsxPluginException): class NsxResourceNotFound(n_exc.NotFound): message = _("%(res_name)s %(res_id)s not found on the backend.") + + +class NsxQosPolicyMappingNotFound(n_exc.NotFound): + message = _('Unable to find mapping for QoS policy: %(policy)s') + + +class NsxQosSmallBw(n_exc.InvalidInput): + message = _("Invalid input for max_kbps. Reason: The minimal legal value " + "for max_kbps is 1024") diff --git a/vmware_nsx/db/db.py b/vmware_nsx/db/db.py index cac071c8c9..49e68fa0a4 100644 --- a/vmware_nsx/db/db.py +++ b/vmware_nsx/db/db.py @@ -238,3 +238,22 @@ def get_l2gw_connection_mapping(session, connection_id): filter_by(connection_id=connection_id).one()) except exc.NoResultFound: raise nsx_exc.NsxL2GWConnectionMappingNotFound(conn=connection_id) + + +# NSXv3 QoS policy id <-> switch Id mapping +def add_qos_policy_profile_mapping(session, qos_policy_id, switch_profile_id): + with session.begin(subtransactions=True): + mapping = nsx_models.QosPolicySwitchProfile( + qos_policy_id=qos_policy_id, + switch_profile_id=switch_profile_id) + session.add(mapping) + return mapping + + +def get_switch_profile_by_qos_policy(session, qos_policy_id): + try: + entry = (session.query(nsx_models.QosPolicySwitchProfile). + filter_by(qos_policy_id=qos_policy_id).one()) + return entry.switch_profile_id + except exc.NoResultFound: + raise nsx_exc.NsxQosPolicyMappingNotFound(policy=qos_policy_id) diff --git a/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD b/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD index 1c5494b0e9..2c1878b6f1 100644 --- a/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD +++ b/vmware_nsx/db/migration/alembic_migrations/versions/EXPAND_HEAD @@ -1 +1 @@ -967462f585e1 +b7f41687cbad diff --git a/vmware_nsx/db/migration/alembic_migrations/versions/newton/expand/b7f41687cbad_nsxv3_qos_policy_mapping.py b/vmware_nsx/db/migration/alembic_migrations/versions/newton/expand/b7f41687cbad_nsxv3_qos_policy_mapping.py new file mode 100644 index 0000000000..aaa6391e8d --- /dev/null +++ b/vmware_nsx/db/migration/alembic_migrations/versions/newton/expand/b7f41687cbad_nsxv3_qos_policy_mapping.py @@ -0,0 +1,37 @@ +# Copyright 2016 VMware, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""nsxv3_qos_policy_mapping + +Revision ID: b7f41687cbad +Revises: 967462f585e1 +Create Date: 2016-03-17 06:12:09.450116 +""" + +# revision identifiers, used by Alembic. +revision = 'b7f41687cbad' +down_revision = '967462f585e1' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table( + 'neutron_nsx_qos_policy_mappings', + sa.Column('qos_policy_id', sa.String(36), nullable=False), + sa.Column('switch_profile_id', sa.String(36), nullable=False), + sa.ForeignKeyConstraint(['qos_policy_id'], ['qos_policies.id'], + ondelete='CASCADE'), + sa.PrimaryKeyConstraint('qos_policy_id')) diff --git a/vmware_nsx/db/nsx_models.py b/vmware_nsx/db/nsx_models.py index 6b473bb3ff..700229fb29 100644 --- a/vmware_nsx/db/nsx_models.py +++ b/vmware_nsx/db/nsx_models.py @@ -319,3 +319,13 @@ class NsxL2GWConnectionMapping(model_base.BASEV2): sa.ForeignKey("ports.id", ondelete="CASCADE"), nullable=False) bridge_endpoint_id = sa.Column(sa.String(36), nullable=False) + + +class QosPolicySwitchProfile(model_base.BASEV2): + # Maps neutron qos policy identifiers to NSX-V3 switch profile identifiers + __tablename__ = 'neutron_nsx_qos_policy_mappings' + qos_policy_id = sa.Column(sa.String(36), + sa.ForeignKey('qos_policies.id', + ondelete='CASCADE'), + primary_key=True) + switch_profile_id = sa.Column(sa.String(36), nullable=False) diff --git a/vmware_nsx/nsxlib/v3/__init__.py b/vmware_nsx/nsxlib/v3/__init__.py index 71ad55cea9..728a550d76 100644 --- a/vmware_nsx/nsxlib/v3/__init__.py +++ b/vmware_nsx/nsxlib/v3/__init__.py @@ -178,12 +178,18 @@ def update_logical_router_advertisement(logical_router_id, **kwargs): return update_resource_with_retry(resource, kwargs) -def create_qos_switching_profile(tags, qos_marking=None, dscp=None, name=None, - description=None): - resource = 'switching-profiles' +def _build_qos_switching_profile_args(tags, qos_marking=None, dscp=None, + name=None, description=None): body = {"resource_type": "QosSwitchingProfile", "tags": tags} - # TODO(abhide): Add TrafficShaper configuration. + + return _update_qos_switching_profile_args( + body, qos_marking=qos_marking, dscp=dscp, + name=name, description=description) + + +def _update_qos_switching_profile_args(body, qos_marking=None, dscp=None, + name=None, description=None): if qos_marking: body["dscp"] = {} body["dscp"]["mode"] = qos_marking.upper() @@ -193,9 +199,75 @@ def create_qos_switching_profile(tags, qos_marking=None, dscp=None, name=None, body["display_name"] = name if description: body["description"] = description + return body + + +def _enable_shaping_in_args(body, burst_size=None, peak_bandwidth=None, + average_bandwidth=None): + for shaper in body["shaper_configuration"]: + # Neutron currently supports only shaping of Egress traffic + if shaper["resource_type"] == "EgressRateShaper": + shaper["enabled"] = True + if burst_size: + shaper["burst_size_bytes"] = burst_size + if peak_bandwidth: + shaper["peak_bandwidth_mbps"] = peak_bandwidth + if average_bandwidth: + shaper["average_bandwidth_mbps"] = average_bandwidth + break + + return body + + +def _disable_shaping_in_args(body): + for shaper in body["shaper_configuration"]: + # Neutron currently supports only shaping of Egress traffic + if shaper["resource_type"] == "EgressRateShaper": + shaper["enabled"] = False + shaper["burst_size_bytes"] = 0 + shaper["peak_bandwidth_mbps"] = 0 + shaper["average_bandwidth_mbps"] = 0 + break + + return body + + +def create_qos_switching_profile(tags, qos_marking=None, dscp=None, name=None, + description=None): + resource = 'switching-profiles' + body = _build_qos_switching_profile_args(tags, qos_marking, dscp, + name, description) return client.create_resource(resource, body) +def update_qos_switching_profile(profile_id, tags, qos_marking=None, + dscp=None, name=None, description=None): + resource = 'switching-profiles/%s' % profile_id + # get the current configuration + body = get_qos_switching_profile(profile_id) + # update the relevant fields + body = _update_qos_switching_profile_args(body, qos_marking, dscp, + name, description) + return update_resource_with_retry(resource, body) + + +def update_qos_switching_profile_shaping(profile_id, shaping_enabled=False, + burst_size=None, peak_bandwidth=None, + average_bandwidth=None): + resource = 'switching-profiles/%s' % profile_id + # get the current configuration + body = get_qos_switching_profile(profile_id) + # update the relevant fields + if shaping_enabled: + body = _enable_shaping_in_args(body, + burst_size=burst_size, + peak_bandwidth=peak_bandwidth, + average_bandwidth=average_bandwidth) + else: + body = _disable_shaping_in_args(body) + return update_resource_with_retry(resource, body) + + def get_qos_switching_profile(profile_id): resource = 'switching-profiles/%s' % profile_id return client.get_resource(resource) diff --git a/vmware_nsx/plugins/nsx_v3/plugin.py b/vmware_nsx/plugins/nsx_v3/plugin.py index c240a36755..2d4b1cf523 100644 --- a/vmware_nsx/plugins/nsx_v3/plugin.py +++ b/vmware_nsx/plugins/nsx_v3/plugin.py @@ -16,8 +16,11 @@ import netaddr import six from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api +from neutron.api.rpc.callbacks.consumer import registry as callbacks_registry +from neutron.api.rpc.callbacks import resources as callbacks_resources from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import metadata_rpc +from neutron.api.rpc.handlers import resources_rpc from neutron.api.v2 import attributes from neutron.callbacks import events from neutron.callbacks import exceptions as callback_exc @@ -53,6 +56,7 @@ from neutron.extensions import securitygroup as ext_sg from neutron.plugins.common import constants as plugin_const from neutron.plugins.common import utils as n_utils from neutron.quota import resource_registry +from neutron.services.qos import qos_consts from neutron_lib import constants as const from neutron_lib import exceptions as n_exc from oslo_config import cfg @@ -79,6 +83,7 @@ from vmware_nsx.nsxlib.v3 import dfw_api as firewall from vmware_nsx.nsxlib.v3 import resources as nsx_resources from vmware_nsx.nsxlib.v3 import router from vmware_nsx.nsxlib.v3 import security +from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils LOG = log.getLogger(__name__) @@ -87,7 +92,10 @@ NSX_V3_NO_PSEC_PROFILE_NAME = 'nsx-default-spoof-guard-vif-profile' NSX_V3_DHCP_PROFILE_NAME = 'neutron_port_dhcp_profile' -class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, +# NOTE(asarfaty): the order of inheritance here is important. in order for the +# QoS notification to work, the AgentScheduler init must be called first +class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, + addr_pair_db.AllowedAddressPairsMixin, db_base_plugin_v2.NeutronDbPluginV2, extend_sg_rule.ExtendedSecurityGroupRuleMixin, securitygroups_db.SecurityGroupDbMixin, @@ -97,7 +105,6 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, l3_gwmode_db.L3_NAT_db_mixin, portbindings_db.PortBindingMixin, portsecurity_db.PortSecurityDbMixin, - agentschedulers_db.AZDhcpAgentSchedulerDbMixin, extradhcpopt_db.ExtraDhcpOptMixin, dns_db.DNSDbMixin): @@ -170,6 +177,12 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, self._switching_profiles, self._switching_profiles.find_by_display_name( NSX_V3_NO_PSEC_PROFILE_NAME)[0])[0] + + # Bind QoS notifications + callbacks_registry.subscribe(qos_utils.handle_qos_notification, + callbacks_resources.QOS_POLICY) + self.start_rpc_listeners_called = False + LOG.debug("Initializing NSX v3 DHCP switching profile") self._dhcp_profile = None try: @@ -333,6 +346,10 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, ) def start_rpc_listeners(self): + if self.start_rpc_listeners_called: + # If called more than once - we should not create it again + return self.conn.consume_in_threads() + self._setup_rpc() self.topic = topics.PLUGIN self.conn = n_rpc.create_connection() @@ -340,6 +357,13 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, self.conn.create_consumer(topics.REPORTS, [agents_db.AgentExtRpcCallback()], fanout=False) + qos_topic = resources_rpc.resource_type_versioned_topic( + callbacks_resources.QOS_POLICY) + self.conn.create_consumer(qos_topic, + [resources_rpc.ResourcesPushRpcCallback()], + fanout=False) + self.start_rpc_listeners_called = True + return self.conn.consume_in_threads() def _validate_provider_create(self, context, network_data): @@ -483,8 +507,18 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, network[pnet.PHYSICAL_NETWORK] = bindings[0].phy_uuid network[pnet.SEGMENTATION_ID] = bindings[0].vlan_id + # NSX-V3 networks cannot be associated with QoS policies + def _validate_no_qos(self, net_data): + err_msg = None + if attributes.is_attr_set(net_data.get(qos_consts.QOS_POLICY_ID)): + err_msg = _("Cannot configure QOS on networks") + + if err_msg: + raise n_exc.InvalidInput(error_message=err_msg) + def create_network(self, context, network): net_data = network['network'] + self._validate_no_qos(net_data) external = net_data.get(ext_net_extn.EXTERNAL) is_backend_network = False if attributes.is_attr_set(external) and external: @@ -621,6 +655,7 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, def update_network(self, context, id, network): original_net = super(NsxV3Plugin, self).get_network(context, id) net_data = network['network'] + self._validate_no_qos(net_data) # Neutron does not support changing provider network values pnet._raise_if_updates_provider_attributes(net_data) updated_net = super(NsxV3Plugin, self).update_network(context, id, @@ -772,6 +807,21 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, name = port_data['name'] return name + def _get_qos_profile_id(self, context, policy_id): + switch_profile_id = nsx_db.get_switch_profile_by_qos_policy( + context.session, policy_id) + qos_profile = nsxlib.get_qos_switching_profile(switch_profile_id) + if qos_profile: + profile_ids = self._switching_profiles.build_switch_profile_ids( + self._switching_profiles, qos_profile) + if profile_ids and len(profile_ids) > 0: + # We have only 1 QoS profile, so this array is of size 1 + return profile_ids[0] + # Didn't find it + err_msg = _("Could not find QoS switching profile for policy " + "%s") % policy_id + raise n_exc.InvalidInput(error_message=err_msg) + def _create_port_at_the_backend(self, context, port_data, l2gw_port_check, psec_is_on): device_owner = port_data.get('device_owner') @@ -812,19 +862,39 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, if device_owner == const.DEVICE_OWNER_DHCP: profiles.append(self._dhcp_profile) + # Add QoS switching profile, if exists + qos_policy_id = None + if attributes.is_attr_set(port_data.get(qos_consts.QOS_POLICY_ID)): + qos_policy_id = port_data[qos_consts.QOS_POLICY_ID] + qos_profile_id = self._get_qos_profile_id(context, qos_policy_id) + profiles.append(qos_profile_id) + name = self._get_port_name(context, port_data) nsx_net_id = port_data[pbin.VIF_DETAILS]['nsx-logical-switch-id'] - result = self._port_client.create( - nsx_net_id, vif_uuid, - tags=tags, - name=name, - admin_state=port_data['admin_state_up'], - address_bindings=address_bindings, - attachment_type=attachment_type, - parent_name=parent_name, parent_tag=tag, - switch_profile_ids=profiles) + try: + result = self._port_client.create( + nsx_net_id, vif_uuid, + tags=tags, + name=name, + admin_state=port_data['admin_state_up'], + address_bindings=address_bindings, + attachment_type=attachment_type, + parent_name=parent_name, parent_tag=tag, + switch_profile_ids=profiles) + except nsx_exc.ManagerError as inst: + # we may fail if the QoS is not supported for this port + # (for example - transport zone with KVM) + LOG.exception(_LE("Unable to create port on the backend: %s"), + inst) + msg = _("Unable to create port on the backend") + raise nsx_exc.NsxPluginException(err_msg=msg) + # Attach the policy to the port in the neutron DB + if qos_policy_id: + qos_utils.update_port_policy_binding(context, + port_data['id'], + qos_policy_id) return result def _validate_address_pairs(self, address_pairs): @@ -879,6 +949,15 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, if lport_id: self._port_client.delete(lport_id) + def _assert_on_external_net_with_qos(self, port_data): + # Prevent creating/update port with QoS policy + # on external networks. + if attributes.is_attr_set(port_data.get(qos_consts.QOS_POLICY_ID)): + err_msg = _("Unable to update/create a port with an external " + "network and a QoS policy") + LOG.warning(err_msg) + raise n_exc.InvalidInput(error_message=err_msg) + def create_port(self, context, port, l2gw_port_check=False): port_data = port['port'] dhcp_opts = port_data.get(ext_edo.EXTRADHCPOPTS, []) @@ -889,6 +968,7 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, context, port_data['network_id']) if is_external_net: self._assert_on_external_net_with_compute(port_data) + self._assert_on_external_net_with_qos(port_data) neutron_db = super(NsxV3Plugin, self).create_port(context, port) port["port"].update(neutron_db) @@ -1103,15 +1183,49 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, if updated_device_owner == const.DEVICE_OWNER_DHCP: switch_profile_ids.append(self._dhcp_profile) - self._port_client.update( - lport_id, vif_uuid, name=name, - attachment_type=attachment_type, - admin_state=updated_port.get('admin_state_up'), - address_bindings=address_bindings, - switch_profile_ids=switch_profile_ids, - resources=resources, - parent_name=parent_name, - parent_tag=tag) + # Update QoS switch profile + qos_policy_id, qos_profile_id = self._get_port_qos_ids(context, + updated_port) + if qos_profile_id is not None: + switch_profile_ids.append(qos_profile_id) + + try: + self._port_client.update( + lport_id, vif_uuid, name=name, + attachment_type=attachment_type, + admin_state=updated_port.get('admin_state_up'), + address_bindings=address_bindings, + switch_profile_ids=switch_profile_ids, + resources=resources, + parent_name=parent_name, + parent_tag=tag) + except nsx_exc.ManagerError as inst: + # we may fail if the QoS is not supported for this port + # (for example - transport zone with KVM) + LOG.exception(_LE("Unable to update port on the backend: %s"), + inst) + msg = _("Unable to update port on the backend") + raise nsx_exc.NsxPluginException(err_msg=msg) + + # Attach/Detach the QoS policies to the port in the neutron DB + qos_utils.update_port_policy_binding(context, + updated_port['id'], + qos_policy_id) + + def _get_port_qos_ids(self, context, updated_port): + # when a port is updated, get the current QoS policy/profile ids + policy_id = None + profile_id = None + if (qos_consts.QOS_POLICY_ID in updated_port): + policy_id = updated_port[qos_consts.QOS_POLICY_ID] + else: + # Look for the previous QoS policy + policy_id = qos_utils.get_port_policy_id( + context, updated_port['id']) + if policy_id is not None: + profile_id = self._get_qos_profile_id(context, policy_id) + + return policy_id, profile_id def update_port(self, context, id, port): switch_profile_ids = None @@ -1124,6 +1238,7 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin, context, original_port['network_id']) if is_external_net: self._assert_on_external_net_with_compute(port['port']) + self._assert_on_external_net_with_qos(port['port']) updated_port = super(NsxV3Plugin, self).update_port(context, id, port) diff --git a/vmware_nsx/services/qos/nsx_v3/message_queue.py b/vmware_nsx/services/qos/nsx_v3/message_queue.py new file mode 100644 index 0000000000..a155fdb5de --- /dev/null +++ b/vmware_nsx/services/qos/nsx_v3/message_queue.py @@ -0,0 +1,29 @@ +# Copyright 2016 VMware, Inc. +# +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.api.rpc.callbacks import events +from neutron.services.qos.notification_drivers import message_queue + + +class NsxV3QosNotificationDriver( + message_queue.RpcQosServiceNotificationDriver): + """NSXv3 message queue service notification driver for QoS. + Overriding the create_policy method in order to add a notification + message in this case too. + """ + + def create_policy(self, context, policy): + self.notification_api.push(context, policy, events.CREATED) diff --git a/vmware_nsx/services/qos/nsx_v3/utils.py b/vmware_nsx/services/qos/nsx_v3/utils.py new file mode 100644 index 0000000000..41728a5eee --- /dev/null +++ b/vmware_nsx/services/qos/nsx_v3/utils.py @@ -0,0 +1,181 @@ +# Copyright 2016 VMware, Inc. +# +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.api.rpc.callbacks import events as callbacks_events +from neutron.api.v2 import attributes +from neutron import context as n_context +from neutron.objects.qos import policy as qos_policy +from oslo_log import log as logging + +from vmware_nsx._i18n import _ +from vmware_nsx.common import exceptions as nsx_exc +from vmware_nsx.common import utils +from vmware_nsx.db import db as nsx_db +from vmware_nsx.nsxlib import v3 as nsxlib + +LOG = logging.getLogger(__name__) +MAX_KBPS_MIN_VALUE = 1024 + + +def update_port_policy_binding(context, port_id, new_policy_id): + # detach the old policy (if exists) from the port + old_policy = qos_policy.QosPolicy.get_port_policy( + context, port_id) + if old_policy: + if old_policy.id == new_policy_id: + return + old_policy.detach_port(port_id) + + # attach the new policy (if exists) to the port + if new_policy_id is not None: + new_policy = qos_policy.QosPolicy.get_object( + context, id=new_policy_id) + if new_policy: + new_policy.attach_port(port_id) + + +def get_port_policy_id(context, port_id): + policy = qos_policy.QosPolicy.get_port_policy( + context, port_id) + if policy: + return policy.id + return + + +def handle_qos_notification(policy_obj, event_type): + handler = QosNotificationsHandler() + context = n_context.get_admin_context() + + # Reload the policy as admin so we will have a context + if (event_type != callbacks_events.DELETED): + policy = qos_policy.QosPolicy.get_object(context, id=policy_obj.id) + + # Check if QoS policy rule was created/deleted/updated + if (event_type == callbacks_events.CREATED): + handler.create_policy(context, policy) + + elif (event_type == callbacks_events.UPDATED): + if (hasattr(policy_obj, "rules")): + # With rules - the policy rule was created / deleted / updated + rules = policy_obj["rules"] + if not len(rules): + # the rule was deleted + handler.delete_policy_bandwidth_limit_rule( + context, policy_obj.id) + else: + # New or updated rule + handler.create_or_update_policy_bandwidth_limit_rule( + context, policy_obj.id, rules[0]) + else: + # Without rules - need to update only name / description + handler.update_policy(context, policy_obj.id, policy) + + elif (event_type == callbacks_events.DELETED): + handler.delete_policy(context, policy_obj.id) + + else: + msg = _("Unknown QoS notification event %s") % event_type + raise nsx_exc.NsxPluginException(err_msg=msg) + + +class QosNotificationsHandler(object): + + def __init__(self): + super(QosNotificationsHandler, self).__init__() + + def _get_tags(self, context, policy): + policy_dict = {'id': policy.id, 'tenant_id': policy.tenant_id} + return utils.build_v3_tags_payload( + policy_dict, resource_type='os-neutron-qos-id', + project_name=context.tenant_name) + + def create_policy(self, context, policy): + policy_id = policy.id + tags = self._get_tags(context, policy) + result = nsxlib.create_qos_switching_profile( + tags=tags, name=policy.name, + description=policy.description) + if not result or not attributes.is_attr_set(result.get('id')): + msg = _("Unable to create QoS switching profile on the backend") + raise nsx_exc.NsxPluginException(err_msg=msg) + profile_id = result['id'] + + # Add the mapping entry of the policy_id <-> profile_id + nsx_db.add_qos_policy_profile_mapping(context.session, + policy_id, + profile_id) + + def delete_policy(self, context, policy_id): + profile_id = nsx_db.get_switch_profile_by_qos_policy( + context.session, policy_id) + nsxlib.delete_qos_switching_profile(profile_id) + + def update_policy(self, context, policy_id, policy): + profile_id = nsx_db.get_switch_profile_by_qos_policy( + context.session, policy_id) + tags = self._get_tags(context, policy) + nsxlib.update_qos_switching_profile( + profile_id, + tags=tags, + name=policy.name, + description=policy.description) + + def _get_bw_values_from_rule(self, bw_rule): + """Translate the neutron bandwidth_limit_rule values, into the + values expected by the NSX-v3 QoS switch profile, + and validate that those are legal + """ + # validate the max_kbps - it must be at least 1Mbps for the + # switch profile configuration to succeed. + if (bw_rule.max_kbps < MAX_KBPS_MIN_VALUE): + raise nsx_exc.NsxQosSmallBw() + + # 'None' value means we will keep the old value + burst_size = peak_bandwidth = average_bandwidth = None + + # translate kbps -> bytes + burst_size = int(bw_rule.max_burst_kbps) * 128 + + # translate kbps -> Mbps + peak_bandwidth = int(float(bw_rule.max_kbps) / 1024) + # neutron QoS does not support this parameter + average_bandwidth = peak_bandwidth + + return burst_size, peak_bandwidth, average_bandwidth + + def create_or_update_policy_bandwidth_limit_rule(self, context, policy_id, + bw_rule): + """Update the QoS switch profile with the BW limitations of a + new or updated bandwidth limit rule + """ + profile_id = nsx_db.get_switch_profile_by_qos_policy( + context.session, policy_id) + + burst_size, peak_bw, average_bw = self._get_bw_values_from_rule( + bw_rule) + + nsxlib.update_qos_switching_profile_shaping( + profile_id, + shaping_enabled=True, + burst_size=burst_size, + peak_bandwidth=peak_bw, + average_bandwidth=average_bw) + + def delete_policy_bandwidth_limit_rule(self, context, policy_id): + profile_id = nsx_db.get_switch_profile_by_qos_policy( + context.session, policy_id) + nsxlib.update_qos_switching_profile_shaping( + profile_id, shaping_enabled=False) diff --git a/vmware_nsx/tests/unit/nsx_v3/test_plugin.py b/vmware_nsx/tests/unit/nsx_v3/test_plugin.py index 5e3758f0c4..b24e753fce 100644 --- a/vmware_nsx/tests/unit/nsx_v3/test_plugin.py +++ b/vmware_nsx/tests/unit/nsx_v3/test_plugin.py @@ -216,6 +216,11 @@ class TestPortsV2(test_plugin.TestPortsV2, NsxV3PluginTestCaseMixin, VIF_TYPE = portbindings.VIF_TYPE_OVS HAS_PORT_FILTER = True + def setUp(self): + super(TestPortsV2, self).setUp() + self.plugin = manager.NeutronManager.get_plugin() + self.ctx = context.get_admin_context() + def test_update_port_delete_ip(self): # This test case overrides the default because the nsx plugin # implements port_security/security groups and it is not allowed @@ -258,6 +263,54 @@ class TestPortsV2(test_plugin.TestPortsV2, NsxV3PluginTestCaseMixin, self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int) + def test_create_port_with_qos(self): + with self.network() as network: + policy_id = uuidutils.generate_uuid() + data = {'port': { + 'network_id': network['network']['id'], + 'tenant_id': self._tenant_id, + 'qos_policy_id': policy_id, + 'name': 'qos_port', + 'admin_state_up': True, + 'device_id': 'fake_device', + 'device_owner': 'fake_owner', + 'fixed_ips': [], + 'mac_address': '00:00:00:00:00:01'} + } + with mock.patch.object(self.plugin, '_get_qos_profile_id'): + port = self.plugin.create_port(self.ctx, data) + self.assertEqual(policy_id, port['qos_policy_id']) + + def test_update_port_with_qos(self): + with self.network() as network: + data = {'port': { + 'network_id': network['network']['id'], + 'tenant_id': self._tenant_id, + 'name': 'qos_port', + 'admin_state_up': True, + 'device_id': 'fake_device', + 'device_owner': 'fake_owner', + 'fixed_ips': [], + 'mac_address': '00:00:00:00:00:01'} + } + port = self.plugin.create_port(self.ctx, data) + policy_id = uuidutils.generate_uuid() + data['port']['qos_policy_id'] = policy_id + with mock.patch.object(self.plugin, '_get_qos_profile_id'): + res = self.plugin.update_port(self.ctx, port['id'], data) + self.assertEqual(policy_id, res['qos_policy_id']) + + def test_create_ext_port_with_qos_fail(self): + with self._create_l3_ext_network() as network: + with self.subnet(network=network, cidr='10.0.0.0/24'): + policy_id = uuidutils.generate_uuid() + data = {'port': {'network_id': network['network']['id'], + 'tenant_id': self._tenant_id, + 'qos_policy_id': policy_id}} + # Cannot add qos policy to a port on ext network + self.assertRaises(n_exc.InvalidInput, + self.plugin.create_port, self.ctx, data) + class DHCPOptsTestCase(test_dhcpopts.TestExtraDhcpOpt, NsxV3PluginTestCaseMixin): diff --git a/vmware_nsx/tests/unit/nsxlib/v3/test_qos_switching_profile.py b/vmware_nsx/tests/unit/nsxlib/v3/test_qos_switching_profile.py index c9ebb0c571..8266deb664 100644 --- a/vmware_nsx/tests/unit/nsxlib/v3/test_qos_switching_profile.py +++ b/vmware_nsx/tests/unit/nsxlib/v3/test_qos_switching_profile.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import mock + from oslo_log import log from oslo_serialization import jsonutils @@ -26,7 +28,8 @@ LOG = log.getLogger(__name__) class NsxLibQosTestCase(nsxlib_testcase.NsxClientTestCase): - def _body(self, qos_marking=None, dscp=None): + def _body(self, qos_marking=None, dscp=None, + description=test_constants_v3.FAKE_NAME): body = { "resource_type": "QosSwitchingProfile", "tags": [] @@ -37,7 +40,30 @@ class NsxLibQosTestCase(nsxlib_testcase.NsxClientTestCase): if dscp: body["dscp"]["priority"] = dscp body["display_name"] = test_constants_v3.FAKE_NAME - body["description"] = test_constants_v3.FAKE_NAME + body["description"] = description + + return body + + def _body_with_shaping(self, shaping_enabled=False, + burst_size=None, + peak_bandwidth=None, + average_bandwidth=None, + description=test_constants_v3.FAKE_NAME): + body = test_constants_v3.FAKE_QOS_PROFILE + body["display_name"] = test_constants_v3.FAKE_NAME + body["description"] = description + + for shaper in body["shaper_configuration"]: + # Neutron currently support only shaping of Egress traffic + if shaper["resource_type"] == "EgressRateShaper": + shaper["enabled"] = shaping_enabled + if burst_size: + shaper["burst_size_bytes"] = burst_size + if peak_bandwidth: + shaper["peak_bandwidth_mbps"] = peak_bandwidth + if average_bandwidth: + shaper["average_bandwidth_mbps"] = average_bandwidth + break return body @@ -75,6 +101,90 @@ class NsxLibQosTestCase(nsxlib_testcase.NsxClientTestCase): data=jsonutils.dumps(self._body(qos_marking='trusted', dscp=0), sort_keys=True)) + def test_update_qos_switching_profile(self): + """ + Test updating a qos-switching profile returns the correct response + """ + api = self.mocked_rest_fns(nsxlib, 'client') + + original_profile = self._body() + new_description = "Test" + with mock.patch.object(nsxlib.client, 'get_resource', + return_value=original_profile): + # update the description of the profile + nsxlib.update_qos_switching_profile( + test_constants_v3.FAKE_QOS_PROFILE['id'], + tags=[], + description=new_description) + + test_client.assert_json_call( + 'put', api, + 'https://1.2.3.4/api/v1/switching-profiles/%s' + % test_constants_v3.FAKE_QOS_PROFILE['id'], + data=jsonutils.dumps(self._body(description=new_description), + sort_keys=True)) + + def test_enable_qos_switching_profile_shaping(self): + """ + Test updating a qos-switching profile returns the correct response + """ + api = self.mocked_rest_fns(nsxlib, 'client') + + original_profile = self._body_with_shaping() + burst_size = 100 + peak_bandwidth = 200 + average_bandwidth = 300 + with mock.patch.object(nsxlib.client, 'get_resource', + return_value=original_profile): + # update the bw shaping of the profile + nsxlib.update_qos_switching_profile_shaping( + test_constants_v3.FAKE_QOS_PROFILE['id'], + shaping_enabled=True, + burst_size=burst_size, + peak_bandwidth=peak_bandwidth, + average_bandwidth=average_bandwidth) + + test_client.assert_json_call( + 'put', api, + 'https://1.2.3.4/api/v1/switching-profiles/%s' + % test_constants_v3.FAKE_QOS_PROFILE['id'], + data=jsonutils.dumps( + self._body_with_shaping( + shaping_enabled=True, + burst_size=burst_size, + peak_bandwidth=peak_bandwidth, + average_bandwidth=average_bandwidth), + sort_keys=True)) + + def test_disable_qos_switching_profile_shaping(self): + """ + Test updating a qos-switching profile returns the correct response + """ + api = self.mocked_rest_fns(nsxlib, 'client') + + burst_size = 100 + peak_bandwidth = 200 + average_bandwidth = 300 + original_profile = self._body_with_shaping( + shaping_enabled=True, + burst_size=burst_size, + peak_bandwidth=peak_bandwidth, + average_bandwidth=average_bandwidth) + with mock.patch.object(nsxlib.client, 'get_resource', + return_value=original_profile): + # update the bw shaping of the profile + nsxlib.update_qos_switching_profile_shaping( + test_constants_v3.FAKE_QOS_PROFILE['id'], + shaping_enabled=False) + + test_client.assert_json_call( + 'put', api, + 'https://1.2.3.4/api/v1/switching-profiles/%s' + % test_constants_v3.FAKE_QOS_PROFILE['id'], + data=jsonutils.dumps( + self._body_with_shaping(), + sort_keys=True)) + def test_delete_qos_switching_profile(self): """ Test deleting qos-switching-profile diff --git a/vmware_nsx/tests/unit/services/qos/fake_notifier.py b/vmware_nsx/tests/unit/services/qos/fake_notifier.py new file mode 100644 index 0000000000..8ce91c541d --- /dev/null +++ b/vmware_nsx/tests/unit/services/qos/fake_notifier.py @@ -0,0 +1,31 @@ +# Copyright 2016 VMware, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.api.rpc.callbacks import events +from neutron.services.qos.notification_drivers import message_queue +from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils + + +class DummyNotificationDriver( + message_queue.RpcQosServiceNotificationDriver): + + def create_policy(self, context, policy): + qos_utils.handle_qos_notification(policy, events.CREATED) + + def update_policy(self, context, policy): + qos_utils.handle_qos_notification(policy, events.UPDATED) + + def delete_policy(self, context, policy): + qos_utils.handle_qos_notification(policy, events.DELETED) diff --git a/vmware_nsx/tests/unit/services/qos/test_nsxv3_notification.py b/vmware_nsx/tests/unit/services/qos/test_nsxv3_notification.py new file mode 100644 index 0000000000..a5fa53c8f9 --- /dev/null +++ b/vmware_nsx/tests/unit/services/qos/test_nsxv3_notification.py @@ -0,0 +1,182 @@ +# Copyright 2016 VMware, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import mock + +from oslo_config import cfg +from oslo_utils import uuidutils + +from neutron import context +from neutron.objects import base as base_object +from neutron.objects.qos import policy as policy_object +from neutron.objects.qos import rule as rule_object +from neutron.services.qos import qos_plugin +from neutron.tests.unit.services.qos import base + +from vmware_nsx.common import utils +from vmware_nsx.db import db as nsx_db +from vmware_nsx.nsxlib import v3 as nsxlib +from vmware_nsx.tests.unit.nsxlib.v3 import nsxlib_testcase + + +class TestQosNsxV3Notification(nsxlib_testcase.NsxClientTestCase, + base.BaseQosTestCase): + + def setUp(self): + super(TestQosNsxV3Notification, self).setUp() + self.setup_coreplugin() + + # Add a dummy notification driver that calls our handler directly + # (to skip the message queue) + cfg.CONF.set_override( + "notification_drivers", + ['vmware_nsx.tests.unit.services.qos.fake_notifier.' + 'DummyNotificationDriver'], + "qos") + + self.qos_plugin = qos_plugin.QoSPlugin() + self.ctxt = context.Context('fake_user', 'fake_tenant') + self.policy_data = { + 'policy': {'id': uuidutils.generate_uuid(), + 'tenant_id': uuidutils.generate_uuid(), + 'name': 'test-policy', + 'description': 'Test policy description', + 'shared': True}} + self.rule_data = { + 'bandwidth_limit_rule': {'id': uuidutils.generate_uuid(), + 'max_kbps': 2000, + 'max_burst_kbps': 150}} + + self.policy = policy_object.QosPolicy( + self.ctxt, **self.policy_data['policy']) + + self.rule = rule_object.QosBandwidthLimitRule( + self.ctxt, **self.rule_data['bandwidth_limit_rule']) + + self.fake_profile_id = 'fake_profile' + self.fake_profile = {'id': self.fake_profile_id} + + mock.patch('neutron.objects.db.api.create_object').start() + mock.patch('neutron.objects.db.api.update_object').start() + mock.patch('neutron.objects.db.api.delete_object').start() + mock.patch( + 'neutron.objects.qos.policy.QosPolicy.obj_load_attr').start() + mock.patch.object(nsx_db, 'get_switch_profile_by_qos_policy', + return_value=self.fake_profile_id).start() + + @mock.patch( + 'neutron.objects.rbac_db.RbacNeutronDbObjectMixin' + '.create_rbac_policy') + @mock.patch.object(nsx_db, 'add_qos_policy_profile_mapping') + def test_policy_create_profile(self, fake_db_add, fake_rbac_create): + # test the switch profile creation when a QoS policy is created + with mock.patch.object(nsxlib, 'create_qos_switching_profile', + return_value=self.fake_profile) as create_profile: + with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object', + return_value=self.policy): + with mock.patch('neutron.objects.qos.policy.QosPolicy.create'): + policy = self.qos_plugin.create_policy(self.ctxt, + self.policy_data) + expected_tags = utils.build_v3_tags_payload( + policy, + resource_type='os-neutron-qos-id', + project_name=self.ctxt.tenant_name) + + create_profile.assert_called_once_with( + description=self.policy_data["policy"]["description"], + name=self.policy_data["policy"]["name"], + tags=expected_tags) + # verify that the policy->profile mapping entry was added + self.assertTrue(fake_db_add.called) + + @mock.patch( + 'neutron.objects.rbac_db.RbacNeutronDbObjectMixin' + '.create_rbac_policy') + def test_policy_update_profile(self, *mocks): + # test the switch profile update when a QoS policy is updated + fields = base_object.get_updatable_fields( + policy_object.QosPolicy, self.policy_data['policy']) + with mock.patch.object(nsxlib, + 'update_qos_switching_profile') as update_profile: + with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object', + return_value=self.policy): + with mock.patch('neutron.objects.qos.policy.QosPolicy.update'): + self.qos_plugin.update_policy( + self.ctxt, self.policy.id, {'policy': fields}) + # verify that the profile was updated with the correct data + self.policy_data["policy"]["id"] = self.policy.id + expected_tags = utils.build_v3_tags_payload( + self.policy_data["policy"], + resource_type='os-neutron-qos-id', + project_name=self.ctxt.tenant_name) + + update_profile.assert_called_once_with( + self.fake_profile_id, + description=self.policy_data["policy"]["description"], + name=self.policy_data["policy"]["name"], + tags=expected_tags + ) + + @mock.patch.object(policy_object.QosPolicy, 'reload_rules') + def test_rule_create_profile(self, *mocks): + # test the switch profile update when a QoS rule is created + _policy = policy_object.QosPolicy( + self.ctxt, **self.policy_data['policy']) + # add a rule to the policy + setattr(_policy, "rules", [self.rule]) + with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object', + return_value=_policy): + with mock.patch.object(nsxlib, + 'update_qos_switching_profile_shaping') as update_profile: + self.qos_plugin.update_policy_bandwidth_limit_rule( + self.ctxt, self.rule.id, _policy.id, self.rule_data) + + # validate the data on the profile + rule_dict = self.rule_data['bandwidth_limit_rule'] + expected_bw = rule_dict['max_kbps'] / 1024 + expected_burst = rule_dict['max_burst_kbps'] * 128 + update_profile.assert_called_once_with( + self.fake_profile_id, + average_bandwidth=expected_bw, + burst_size=expected_burst, + peak_bandwidth=expected_bw, + shaping_enabled=True + ) + + def test_rule_delete_profile(self): + # test the switch profile update when a QoS rule is deleted + _policy = policy_object.QosPolicy( + self.ctxt, **self.policy_data['policy']) + # The mock will return the policy without the rule, + # as if it was deleted + with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object', + return_value=_policy): + with mock.patch.object(nsxlib, + 'update_qos_switching_profile_shaping') as update_profile: + setattr(_policy, "rules", [self.rule]) + self.qos_plugin.delete_policy_bandwidth_limit_rule( + self.ctxt, self.rule.id, self.policy.id) + # validate the data on the profile + update_profile.assert_called_once_with( + self.fake_profile_id, + shaping_enabled=False + ) + + @mock.patch('neutron.objects.db.api.get_object', return_value=None) + def test_policy_delete_profile(self, *mocks): + # test the switch profile deletion when a QoS policy is deleted + with mock.patch.object(nsxlib, 'delete_qos_switching_profile', + return_value=self.fake_profile) as delete_profile: + self.qos_plugin.delete_policy(self.ctxt, self.policy.id) + delete_profile.assert_called_once_with(self.fake_profile_id)