From 1521c59999ad9ef8acfd3e1486a4b9f7903242c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C5=82awek=20Kap=C5=82o=C5=84ski?= Date: Sun, 16 Jul 2017 08:29:15 +0000 Subject: [PATCH] Add Neutron QoS dscp marking rule commands Create/Update/List/Get/Delete QoS DSCP marking rules is now possible to do with shade. Change-Id: I3b233c28c28cfa27e2d15599e34bae60bad83d75 --- shade/openstackcloud.py | 186 +++++++++++ .../tests/unit/test_qos_dscp_marking_rule.py | 294 ++++++++++++++++++ 2 files changed, 480 insertions(+) create mode 100644 shade/tests/unit/test_qos_dscp_marking_rule.py diff --git a/shade/openstackcloud.py b/shade/openstackcloud.py index c85ac0c34..bd3e5829e 100644 --- a/shade/openstackcloud.py +++ b/shade/openstackcloud.py @@ -3505,6 +3505,192 @@ class OpenStackCloud( return True + def search_qos_dscp_marking_rules(self, policy_name_or_id, rule_id=None, + filters=None): + """Search QoS DSCP marking rules + + :param string policy_name_or_id: Name or ID of the QoS policy to which + rules should be associated. + :param string rule_id: ID of searched rule. + :param filters: a dict containing additional filters to use. e.g. + {'dscp_mark': 32} + + :returns: a list of ``munch.Munch`` containing the dscp marking + rule descriptions. + + :raises: ``OpenStackCloudException`` if something goes wrong during the + OpenStack API call. + """ + rules = self.list_qos_dscp_marking_rules(policy_name_or_id, filters) + return _utils._filter_list(rules, rule_id, filters) + + def list_qos_dscp_marking_rules(self, policy_name_or_id, filters=None): + """List all available QoS DSCP marking rules. + + :param string policy_name_or_id: Name or ID of the QoS policy from + from rules should be listed. + :param filters: (optional) dict of filter conditions to push down + :returns: A list of ``munch.Munch`` containing rule info. + + :raises: ``OpenStackCloudResourceNotFound`` if QoS policy will not be + found. + """ + if not self._has_neutron_extension('qos'): + raise OpenStackCloudUnavailableExtension( + 'QoS extension is not available on target cloud') + + policy = self.get_qos_policy(policy_name_or_id) + if not policy: + raise OpenStackCloudResourceNotFound( + "QoS policy {name_or_id} not Found.".format( + name_or_id=policy_name_or_id)) + + # Translate None from search interface to empty {} for kwargs below + if not filters: + filters = {} + + data = self._network_client.get( + "/qos/policies/{policy_id}/dscp_marking_rules.json".format( + policy_id=policy['id']), + params=filters, + error_message="Error fetching QoS DSCP marking rules from " + "{policy}".format(policy=policy['id'])) + return meta.get_and_munchify('dscp_marking_rules', data) + + def get_qos_dscp_marking_rule(self, policy_name_or_id, rule_id): + """Get a QoS DSCP marking rule by name or ID. + + :param string policy_name_or_id: Name or ID of the QoS policy to which + rule should be associated. + :param rule_id: ID of the rule. + + :returns: A bandwidth limit rule ``munch.Munch`` or None if + no matching rule is found. + + """ + if not self._has_neutron_extension('qos'): + raise OpenStackCloudUnavailableExtension( + 'QoS extension is not available on target cloud') + + policy = self.get_qos_policy(policy_name_or_id) + if not policy: + raise OpenStackCloudResourceNotFound( + "QoS policy {name_or_id} not Found.".format( + name_or_id=policy_name_or_id)) + + data = self._network_client.get( + "/qos/policies/{policy_id}/dscp_marking_rules/{rule_id}.json". + format(policy_id=policy['id'], rule_id=rule_id), + error_message="Error fetching QoS DSCP marking rule {rule_id} " + "from {policy}".format(rule_id=rule_id, + policy=policy['id'])) + return meta.get_and_munchify('dscp_marking_rule', data) + + def create_qos_dscp_marking_rule(self, policy_name_or_id, dscp_mark=None): + """Create a QoS DSCP marking rule. + + :param string policy_name_or_id: Name or ID of the QoS policy to which + rule should be associated. + :param int dscp_mark: DSCP mark value + + :returns: The QoS DSCP marking rule. + :raises: OpenStackCloudException on operation error. + """ + if not self._has_neutron_extension('qos'): + raise OpenStackCloudUnavailableExtension( + 'QoS extension is not available on target cloud') + + policy = self.get_qos_policy(policy_name_or_id) + if not policy: + raise OpenStackCloudResourceNotFound( + "QoS policy {name_or_id} not Found.".format( + name_or_id=policy_name_or_id)) + + rule = {} + if dscp_mark: + rule['dscp_mark'] = dscp_mark + + data = self._network_client.post( + "/qos/policies/{policy_id}/dscp_marking_rules".format( + policy_id=policy['id']), + json={'dscp_marking_rule': rule}) + return meta.get_and_munchify('dscp_marking_rule', data) + + def update_qos_dscp_marking_rule(self, policy_name_or_id, rule_id, + dscp_mark=None): + """Update a QoS DSCP marking rule. + + :param string policy_name_or_id: Name or ID of the QoS policy to which + rule is associated. + :param string rule_id: ID of rule to update. + :param int dscp_mark: DSCP mark value + + :returns: The updated QoS bandwidth limit rule. + :raises: OpenStackCloudException on operation error. + """ + if not self._has_neutron_extension('qos'): + raise OpenStackCloudUnavailableExtension( + 'QoS extension is not available on target cloud') + + policy = self.get_qos_policy(policy_name_or_id) + if not policy: + raise OpenStackCloudResourceNotFound( + "QoS policy {name_or_id} not Found.".format( + name_or_id=policy_name_or_id)) + + rule = {} + if dscp_mark: + rule['dscp_mark'] = dscp_mark + if not rule: + self.log.debug("No QoS DSCP marking rule data to update") + return + + curr_rule = self.get_qos_dscp_marking_rule( + policy_name_or_id, rule_id) + if not curr_rule: + raise OpenStackCloudException( + "QoS dscp_marking_rule {rule_id} not found in policy " + "{policy_id}".format(rule_id=rule_id, + policy_id=policy['id'])) + + data = self._network_client.put( + "/qos/policies/{policy_id}/dscp_marking_rules/{rule_id}.json". + format(policy_id=policy['id'], rule_id=rule_id), + json={'dscp_marking_rule': rule}) + return meta.get_and_munchify('dscp_marking_rule', data) + + def delete_qos_dscp_marking_rule(self, policy_name_or_id, rule_id): + """Delete a QoS DSCP marking rule. + + :param string policy_name_or_id: Name or ID of the QoS policy to which + rule is associated. + :param string rule_id: ID of rule to update. + + :raises: OpenStackCloudException on operation error. + """ + if not self._has_neutron_extension('qos'): + raise OpenStackCloudUnavailableExtension( + 'QoS extension is not available on target cloud') + + policy = self.get_qos_policy(policy_name_or_id) + if not policy: + raise OpenStackCloudResourceNotFound( + "QoS policy {name_or_id} not Found.".format( + name_or_id=policy_name_or_id)) + + try: + self._network_client.delete( + "/qos/policies/{policy}/dscp_marking_rules/{rule}.json". + format(policy=policy['id'], rule=rule_id)) + except OpenStackCloudURINotFound: + self.log.debug( + "QoS DSCP marking rule {rule_id} not found in policy " + "{policy_id}. Ignoring.".format(rule_id=rule_id, + policy_id=policy['id'])) + return False + + return True + def _build_external_gateway_info(self, ext_gateway_net_id, enable_snat, ext_fixed_ips): info = {} diff --git a/shade/tests/unit/test_qos_dscp_marking_rule.py b/shade/tests/unit/test_qos_dscp_marking_rule.py new file mode 100644 index 000000000..ccb60e15f --- /dev/null +++ b/shade/tests/unit/test_qos_dscp_marking_rule.py @@ -0,0 +1,294 @@ +# Copyright 2017 OVH SAS +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +from shade import exc +from shade.tests.unit import base + + +class TestQosDscpMarkingRule(base.RequestsMockTestCase): + + policy_name = 'qos test policy' + policy_id = '881d1bb7-a663-44c0-8f9f-ee2765b74486' + project_id = 'c88fc89f-5121-4a4c-87fd-496b5af864e9' + + rule_id = 'ed1a2b05-0ad7-45d7-873f-008b575a02b3' + rule_dscp_mark = 32 + + mock_policy = { + 'id': policy_id, + 'name': policy_name, + 'description': '', + 'rules': [], + 'project_id': project_id, + 'tenant_id': project_id, + 'shared': False, + 'is_default': False + } + + mock_rule = { + 'id': rule_id, + 'dscp_mark': rule_dscp_mark, + } + + qos_extension = { + "updated": "2015-06-08T10:00:00-00:00", + "name": "Quality of Service", + "links": [], + "alias": "qos", + "description": "The Quality of Service extension." + } + + enabled_neutron_extensions = [qos_extension] + + def test_get_qos_dscp_marking_rule(self): + self.register_uris([ + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies.json']), + json={'policies': [self.mock_policy]}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies', self.policy_id, + 'dscp_marking_rules', + '%s.json' % self.rule_id]), + json={'dscp_marking_rule': self.mock_rule}) + ]) + r = self.cloud.get_qos_dscp_marking_rule(self.policy_name, + self.rule_id) + self.assertDictEqual(self.mock_rule, r) + self.assert_calls() + + def test_get_qos_dscp_marking_rule_no_qos_policy_found(self): + self.register_uris([ + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies.json']), + json={'policies': []}) + ]) + self.assertRaises( + exc.OpenStackCloudResourceNotFound, + self.cloud.get_qos_dscp_marking_rule, + self.policy_name, self.rule_id) + self.assert_calls() + + def test_get_qos_dscp_marking_rule_no_qos_extension(self): + self.register_uris([ + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': []}) + ]) + self.assertRaises( + exc.OpenStackCloudException, + self.cloud.get_qos_dscp_marking_rule, + self.policy_name, self.rule_id) + self.assert_calls() + + def test_create_qos_dscp_marking_rule(self): + self.register_uris([ + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies.json']), + json={'policies': [self.mock_policy]}), + dict(method='POST', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies', self.policy_id, + 'dscp_marking_rules']), + json={'dscp_marking_rule': self.mock_rule}) + ]) + rule = self.cloud.create_qos_dscp_marking_rule( + self.policy_name, dscp_mark=self.rule_dscp_mark) + self.assertDictEqual(self.mock_rule, rule) + self.assert_calls() + + def test_create_qos_dscp_marking_rule_no_qos_extension(self): + self.register_uris([ + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': []}) + ]) + self.assertRaises( + exc.OpenStackCloudException, + self.cloud.create_qos_dscp_marking_rule, self.policy_name, + dscp_mark=16) + self.assert_calls() + + def test_update_qos_dscp_marking_rule(self): + new_dscp_mark_value = 16 + expected_rule = copy.copy(self.mock_rule) + expected_rule['dscp_mark'] = new_dscp_mark_value + self.register_uris([ + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies.json']), + json={'policies': [self.mock_policy]}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies.json']), + json={'policies': [self.mock_policy]}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies', self.policy_id, + 'dscp_marking_rules', + '%s.json' % self.rule_id]), + json={'dscp_marking_rule': self.mock_rule}), + dict(method='PUT', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies', self.policy_id, + 'dscp_marking_rules', + '%s.json' % self.rule_id]), + json={'dscp_marking_rule': expected_rule}, + validate=dict( + json={'dscp_marking_rule': { + 'dscp_mark': new_dscp_mark_value}})) + ]) + rule = self.cloud.update_qos_dscp_marking_rule( + self.policy_id, self.rule_id, dscp_mark=new_dscp_mark_value) + self.assertDictEqual(expected_rule, rule) + self.assert_calls() + + def test_update_qos_dscp_marking_rule_no_qos_extension(self): + self.register_uris([ + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': []}) + ]) + self.assertRaises( + exc.OpenStackCloudException, + self.cloud.update_qos_dscp_marking_rule, + self.policy_id, self.rule_id, dscp_mark=8) + self.assert_calls() + + def test_delete_qos_dscp_marking_rule(self): + self.register_uris([ + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies.json']), + json={'policies': [self.mock_policy]}), + dict(method='DELETE', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies', self.policy_id, + 'dscp_marking_rules', + '%s.json' % self.rule_id]), + json={}) + ]) + self.assertTrue( + self.cloud.delete_qos_dscp_marking_rule( + self.policy_name, self.rule_id)) + self.assert_calls() + + def test_delete_qos_dscp_marking_rule_no_qos_extension(self): + self.register_uris([ + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': []}) + ]) + self.assertRaises( + exc.OpenStackCloudException, + self.cloud.delete_qos_dscp_marking_rule, + self.policy_name, self.rule_id) + self.assert_calls() + + def test_delete_qos_dscp_marking_rule_not_found(self): + self.register_uris([ + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', append=['v2.0', 'extensions.json']), + json={'extensions': self.enabled_neutron_extensions}), + dict(method='GET', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies.json']), + json={'policies': [self.mock_policy]}), + dict(method='DELETE', + uri=self.get_mock_url( + 'network', 'public', + append=['v2.0', 'qos', 'policies', self.policy_id, + 'dscp_marking_rules', + '%s.json' % self.rule_id]), + status_code=404) + ]) + self.assertFalse( + self.cloud.delete_qos_dscp_marking_rule( + self.policy_name, self.rule_id)) + self.assert_calls()