Merge "Forbid regular users to reset admin-only attrs to default values"

This commit is contained in:
Jenkins 2014-09-25 03:58:57 +00:00 committed by Gerrit Code Review
commit cee9645572
5 changed files with 58 additions and 13 deletions

View File

@ -513,6 +513,10 @@ class Controller(object):
parent_id=parent_id) parent_id=parent_id)
orig_object_copy = copy.copy(orig_obj) orig_object_copy = copy.copy(orig_obj)
orig_obj.update(body[self._resource]) orig_obj.update(body[self._resource])
# Make a list of attributes to be updated to inform the policy engine
# which attributes are set explicitly so that it can distinguish them
# from the ones that are set to their default values.
orig_obj[const.ATTRIBUTES_TO_UPDATE] = body[self._resource].keys()
try: try:
policy.enforce(request.context, policy.enforce(request.context,
action, action,

View File

@ -150,3 +150,5 @@ DEVICE_NAME_MAX_LEN = 15
# Device names start with "tap" # Device names start with "tap"
TAP_DEVICE_PREFIX = 'tap' TAP_DEVICE_PREFIX = 'tap'
ATTRIBUTES_TO_UPDATE = 'attributes_to_update'

View File

@ -16,6 +16,8 @@
""" """
Policy engine for neutron. Largely copied from nova. Policy engine for neutron. Largely copied from nova.
""" """
import collections
import itertools import itertools
import logging import logging
import re import re
@ -23,6 +25,7 @@ import re
from oslo.config import cfg from oslo.config import cfg
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions from neutron.common import exceptions
import neutron.common.utils as utils import neutron.common.utils as utils
from neutron.openstack.common import excutils from neutron.openstack.common import excutils
@ -119,14 +122,28 @@ def _set_rules(data):
policy.set_rules(policies) policy.set_rules(policies)
def _is_attribute_explicitly_set(attribute_name, resource, target): def _is_attribute_explicitly_set(attribute_name, resource, target, action):
"""Verify that an attribute is present and has a non-default value.""" """Verify that an attribute is present and is explicitly set."""
if 'update' in action:
# In the case of update, the function should not pay attention to a
# default value of an attribute, but check whether it was explicitly
# marked as being updated instead.
return (attribute_name in target[const.ATTRIBUTES_TO_UPDATE] and
target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED)
return ('default' in resource[attribute_name] and return ('default' in resource[attribute_name] and
attribute_name in target and attribute_name in target and
target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED and target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED and
target[attribute_name] != resource[attribute_name]['default']) target[attribute_name] != resource[attribute_name]['default'])
def _should_validate_sub_attributes(attribute, sub_attr):
"""Verify that sub-attributes are iterable and should be validated."""
validate = attribute.get('validate')
return (validate and isinstance(sub_attr, collections.Iterable) and
any([k.startswith('type:dict') and
v for (k, v) in validate.iteritems()]))
def _build_subattr_match_rule(attr_name, attr, action, target): def _build_subattr_match_rule(attr_name, attr, action, target):
"""Create the rule to match for sub-attribute policy checks.""" """Create the rule to match for sub-attribute policy checks."""
# TODO(salv-orlando): Instead of relying on validator info, introduce # TODO(salv-orlando): Instead of relying on validator info, introduce
@ -184,16 +201,14 @@ def _build_match_rule(action, target):
for attribute_name in res_map[resource]: for attribute_name in res_map[resource]:
if _is_attribute_explicitly_set(attribute_name, if _is_attribute_explicitly_set(attribute_name,
res_map[resource], res_map[resource],
target): target, action):
attribute = res_map[resource][attribute_name] attribute = res_map[resource][attribute_name]
if 'enforce_policy' in attribute: if 'enforce_policy' in attribute:
attr_rule = policy.RuleCheck('rule', '%s:%s' % attr_rule = policy.RuleCheck('rule', '%s:%s' %
(action, attribute_name)) (action, attribute_name))
# Build match entries for sub-attributes, if present # Build match entries for sub-attributes
validate = attribute.get('validate') if _should_validate_sub_attributes(
if (validate and any([k.startswith('type:dict') and v attribute, target[attribute_name]):
for (k, v) in
validate.iteritems()])):
attr_rule = policy.AndCheck( attr_rule = policy.AndCheck(
[attr_rule, _build_subattr_match_rule( [attr_rule, _build_subattr_match_rule(
attribute_name, attribute, attribute_name, attribute,

View File

@ -1173,6 +1173,18 @@ class SubresourceTest(base.BaseTestCase, testlib_plugin.PluginSetupHelper):
network_id='id1', network_id='id1',
dummy=body) dummy=body)
def test_update_subresource_to_none(self):
instance = self.plugin.return_value
dummy_id = _uuid()
body = {'dummy': {}}
self.api.put_json('/networks/id1' + _get_path('dummies', id=dummy_id),
body)
instance.update_network_dummy.assert_called_once_with(mock.ANY,
dummy_id,
network_id='id1',
dummy=body)
def test_delete_sub_resource(self): def test_delete_sub_resource(self):
instance = self.plugin.return_value instance = self.plugin.return_value

View File

@ -23,6 +23,7 @@ import six
import neutron import neutron
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions from neutron.common import exceptions
from neutron import context from neutron import context
from neutron import manager from neutron import manager
@ -282,9 +283,11 @@ class NeutronPolicyTestCase(base.BaseTestCase):
fake_manager_instance.plugin = plugin_klass() fake_manager_instance.plugin = plugin_klass()
def _test_action_on_attr(self, context, action, attr, value, def _test_action_on_attr(self, context, action, attr, value,
exception=None): exception=None, **kwargs):
action = "%s_network" % action action = "%s_network" % action
target = {'tenant_id': 'the_owner', attr: value} target = {'tenant_id': 'the_owner', attr: value}
if kwargs:
target.update(kwargs)
if exception: if exception:
self.assertRaises(exception, policy.enforce, self.assertRaises(exception, policy.enforce,
context, action, target) context, action, target)
@ -293,10 +296,10 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self.assertEqual(result, True) self.assertEqual(result, True)
def _test_nonadmin_action_on_attr(self, action, attr, value, def _test_nonadmin_action_on_attr(self, action, attr, value,
exception=None): exception=None, **kwargs):
user_context = context.Context('', "user", roles=['user']) user_context = context.Context('', "user", roles=['user'])
self._test_action_on_attr(user_context, action, attr, self._test_action_on_attr(user_context, action, attr,
value, exception) value, exception, **kwargs)
def test_nonadmin_write_on_private_fails(self): def test_nonadmin_write_on_private_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', False, self._test_nonadmin_action_on_attr('create', 'shared', False,
@ -313,9 +316,11 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_nonadmin_read_on_shared_succeeds(self): def test_nonadmin_read_on_shared_succeeds(self):
self._test_nonadmin_action_on_attr('get', 'shared', True) self._test_nonadmin_action_on_attr('get', 'shared', True)
def _test_enforce_adminonly_attribute(self, action): def _test_enforce_adminonly_attribute(self, action, **kwargs):
admin_context = context.get_admin_context() admin_context = context.get_admin_context()
target = {'shared': True} target = {'shared': True}
if kwargs:
target.update(kwargs)
result = policy.enforce(admin_context, action, target) result = policy.enforce(admin_context, action, target)
self.assertEqual(result, True) self.assertEqual(result, True)
@ -323,7 +328,14 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self._test_enforce_adminonly_attribute('create_network') self._test_enforce_adminonly_attribute('create_network')
def test_enforce_adminonly_attribute_update(self): def test_enforce_adminonly_attribute_update(self):
self._test_enforce_adminonly_attribute('update_network') kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_enforce_adminonly_attribute('update_network', **kwargs)
def test_reset_adminonly_attr_to_default_fails(self):
kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_nonadmin_action_on_attr('update', 'shared', False,
exceptions.PolicyNotAuthorized,
**kwargs)
def test_enforce_adminonly_attribute_no_context_is_admin_policy(self): def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
del self.rules[policy.ADMIN_CTX_POLICY] del self.rules[policy.ADMIN_CTX_POLICY]