Add Neutron QoS dscp marking rule commands

Create/Update/List/Get/Delete QoS DSCP marking rules is
now possible to do with shade.

Change-Id: I3b233c28c28cfa27e2d15599e34bae60bad83d75
This commit is contained in:
Sławek Kapłoński 2017-07-16 08:29:15 +00:00
parent 85d8adaec6
commit 1521c59999
2 changed files with 480 additions and 0 deletions

View File

@ -3505,6 +3505,192 @@ class OpenStackCloud(
return True
def search_qos_dscp_marking_rules(self, policy_name_or_id, rule_id=None,
filters=None):
"""Search QoS DSCP marking rules
:param string policy_name_or_id: Name or ID of the QoS policy to which
rules should be associated.
:param string rule_id: ID of searched rule.
:param filters: a dict containing additional filters to use. e.g.
{'dscp_mark': 32}
:returns: a list of ``munch.Munch`` containing the dscp marking
rule descriptions.
:raises: ``OpenStackCloudException`` if something goes wrong during the
OpenStack API call.
"""
rules = self.list_qos_dscp_marking_rules(policy_name_or_id, filters)
return _utils._filter_list(rules, rule_id, filters)
def list_qos_dscp_marking_rules(self, policy_name_or_id, filters=None):
"""List all available QoS DSCP marking rules.
:param string policy_name_or_id: Name or ID of the QoS policy from
from rules should be listed.
:param filters: (optional) dict of filter conditions to push down
:returns: A list of ``munch.Munch`` containing rule info.
:raises: ``OpenStackCloudResourceNotFound`` if QoS policy will not be
found.
"""
if not self._has_neutron_extension('qos'):
raise OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
if not policy:
raise OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
# Translate None from search interface to empty {} for kwargs below
if not filters:
filters = {}
data = self._network_client.get(
"/qos/policies/{policy_id}/dscp_marking_rules.json".format(
policy_id=policy['id']),
params=filters,
error_message="Error fetching QoS DSCP marking rules from "
"{policy}".format(policy=policy['id']))
return meta.get_and_munchify('dscp_marking_rules', data)
def get_qos_dscp_marking_rule(self, policy_name_or_id, rule_id):
"""Get a QoS DSCP marking rule by name or ID.
:param string policy_name_or_id: Name or ID of the QoS policy to which
rule should be associated.
:param rule_id: ID of the rule.
:returns: A bandwidth limit rule ``munch.Munch`` or None if
no matching rule is found.
"""
if not self._has_neutron_extension('qos'):
raise OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
if not policy:
raise OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
data = self._network_client.get(
"/qos/policies/{policy_id}/dscp_marking_rules/{rule_id}.json".
format(policy_id=policy['id'], rule_id=rule_id),
error_message="Error fetching QoS DSCP marking rule {rule_id} "
"from {policy}".format(rule_id=rule_id,
policy=policy['id']))
return meta.get_and_munchify('dscp_marking_rule', data)
def create_qos_dscp_marking_rule(self, policy_name_or_id, dscp_mark=None):
"""Create a QoS DSCP marking rule.
:param string policy_name_or_id: Name or ID of the QoS policy to which
rule should be associated.
:param int dscp_mark: DSCP mark value
:returns: The QoS DSCP marking rule.
:raises: OpenStackCloudException on operation error.
"""
if not self._has_neutron_extension('qos'):
raise OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
if not policy:
raise OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
rule = {}
if dscp_mark:
rule['dscp_mark'] = dscp_mark
data = self._network_client.post(
"/qos/policies/{policy_id}/dscp_marking_rules".format(
policy_id=policy['id']),
json={'dscp_marking_rule': rule})
return meta.get_and_munchify('dscp_marking_rule', data)
def update_qos_dscp_marking_rule(self, policy_name_or_id, rule_id,
dscp_mark=None):
"""Update a QoS DSCP marking rule.
:param string policy_name_or_id: Name or ID of the QoS policy to which
rule is associated.
:param string rule_id: ID of rule to update.
:param int dscp_mark: DSCP mark value
:returns: The updated QoS bandwidth limit rule.
:raises: OpenStackCloudException on operation error.
"""
if not self._has_neutron_extension('qos'):
raise OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
if not policy:
raise OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
rule = {}
if dscp_mark:
rule['dscp_mark'] = dscp_mark
if not rule:
self.log.debug("No QoS DSCP marking rule data to update")
return
curr_rule = self.get_qos_dscp_marking_rule(
policy_name_or_id, rule_id)
if not curr_rule:
raise OpenStackCloudException(
"QoS dscp_marking_rule {rule_id} not found in policy "
"{policy_id}".format(rule_id=rule_id,
policy_id=policy['id']))
data = self._network_client.put(
"/qos/policies/{policy_id}/dscp_marking_rules/{rule_id}.json".
format(policy_id=policy['id'], rule_id=rule_id),
json={'dscp_marking_rule': rule})
return meta.get_and_munchify('dscp_marking_rule', data)
def delete_qos_dscp_marking_rule(self, policy_name_or_id, rule_id):
"""Delete a QoS DSCP marking rule.
:param string policy_name_or_id: Name or ID of the QoS policy to which
rule is associated.
:param string rule_id: ID of rule to update.
:raises: OpenStackCloudException on operation error.
"""
if not self._has_neutron_extension('qos'):
raise OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
if not policy:
raise OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
try:
self._network_client.delete(
"/qos/policies/{policy}/dscp_marking_rules/{rule}.json".
format(policy=policy['id'], rule=rule_id))
except OpenStackCloudURINotFound:
self.log.debug(
"QoS DSCP marking rule {rule_id} not found in policy "
"{policy_id}. Ignoring.".format(rule_id=rule_id,
policy_id=policy['id']))
return False
return True
def _build_external_gateway_info(self, ext_gateway_net_id, enable_snat,
ext_fixed_ips):
info = {}

View File

@ -0,0 +1,294 @@
# Copyright 2017 OVH SAS
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from shade import exc
from shade.tests.unit import base
class TestQosDscpMarkingRule(base.RequestsMockTestCase):
policy_name = 'qos test policy'
policy_id = '881d1bb7-a663-44c0-8f9f-ee2765b74486'
project_id = 'c88fc89f-5121-4a4c-87fd-496b5af864e9'
rule_id = 'ed1a2b05-0ad7-45d7-873f-008b575a02b3'
rule_dscp_mark = 32
mock_policy = {
'id': policy_id,
'name': policy_name,
'description': '',
'rules': [],
'project_id': project_id,
'tenant_id': project_id,
'shared': False,
'is_default': False
}
mock_rule = {
'id': rule_id,
'dscp_mark': rule_dscp_mark,
}
qos_extension = {
"updated": "2015-06-08T10:00:00-00:00",
"name": "Quality of Service",
"links": [],
"alias": "qos",
"description": "The Quality of Service extension."
}
enabled_neutron_extensions = [qos_extension]
def test_get_qos_dscp_marking_rule(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies.json']),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_id,
'dscp_marking_rules',
'%s.json' % self.rule_id]),
json={'dscp_marking_rule': self.mock_rule})
])
r = self.cloud.get_qos_dscp_marking_rule(self.policy_name,
self.rule_id)
self.assertDictEqual(self.mock_rule, r)
self.assert_calls()
def test_get_qos_dscp_marking_rule_no_qos_policy_found(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies.json']),
json={'policies': []})
])
self.assertRaises(
exc.OpenStackCloudResourceNotFound,
self.cloud.get_qos_dscp_marking_rule,
self.policy_name, self.rule_id)
self.assert_calls()
def test_get_qos_dscp_marking_rule_no_qos_extension(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': []})
])
self.assertRaises(
exc.OpenStackCloudException,
self.cloud.get_qos_dscp_marking_rule,
self.policy_name, self.rule_id)
self.assert_calls()
def test_create_qos_dscp_marking_rule(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies.json']),
json={'policies': [self.mock_policy]}),
dict(method='POST',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_id,
'dscp_marking_rules']),
json={'dscp_marking_rule': self.mock_rule})
])
rule = self.cloud.create_qos_dscp_marking_rule(
self.policy_name, dscp_mark=self.rule_dscp_mark)
self.assertDictEqual(self.mock_rule, rule)
self.assert_calls()
def test_create_qos_dscp_marking_rule_no_qos_extension(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': []})
])
self.assertRaises(
exc.OpenStackCloudException,
self.cloud.create_qos_dscp_marking_rule, self.policy_name,
dscp_mark=16)
self.assert_calls()
def test_update_qos_dscp_marking_rule(self):
new_dscp_mark_value = 16
expected_rule = copy.copy(self.mock_rule)
expected_rule['dscp_mark'] = new_dscp_mark_value
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies.json']),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies.json']),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_id,
'dscp_marking_rules',
'%s.json' % self.rule_id]),
json={'dscp_marking_rule': self.mock_rule}),
dict(method='PUT',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_id,
'dscp_marking_rules',
'%s.json' % self.rule_id]),
json={'dscp_marking_rule': expected_rule},
validate=dict(
json={'dscp_marking_rule': {
'dscp_mark': new_dscp_mark_value}}))
])
rule = self.cloud.update_qos_dscp_marking_rule(
self.policy_id, self.rule_id, dscp_mark=new_dscp_mark_value)
self.assertDictEqual(expected_rule, rule)
self.assert_calls()
def test_update_qos_dscp_marking_rule_no_qos_extension(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': []})
])
self.assertRaises(
exc.OpenStackCloudException,
self.cloud.update_qos_dscp_marking_rule,
self.policy_id, self.rule_id, dscp_mark=8)
self.assert_calls()
def test_delete_qos_dscp_marking_rule(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies.json']),
json={'policies': [self.mock_policy]}),
dict(method='DELETE',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_id,
'dscp_marking_rules',
'%s.json' % self.rule_id]),
json={})
])
self.assertTrue(
self.cloud.delete_qos_dscp_marking_rule(
self.policy_name, self.rule_id))
self.assert_calls()
def test_delete_qos_dscp_marking_rule_no_qos_extension(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': []})
])
self.assertRaises(
exc.OpenStackCloudException,
self.cloud.delete_qos_dscp_marking_rule,
self.policy_name, self.rule_id)
self.assert_calls()
def test_delete_qos_dscp_marking_rule_not_found(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions.json']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies.json']),
json={'policies': [self.mock_policy]}),
dict(method='DELETE',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_id,
'dscp_marking_rules',
'%s.json' % self.rule_id]),
status_code=404)
])
self.assertFalse(
self.cloud.delete_qos_dscp_marking_rule(
self.policy_name, self.rule_id))
self.assert_calls()