Forbid regular users to reset admin-only attrs to default values

A regular user can reset an admin-only attribute to its default
value due to the fact that a corresponding policy rule is
enforced only in the case when an attribute is present in the
target AND has a non-default value.

Added a new attribute "attributes_to_update" which contains a list
of all to-be updated attributes to the body of the target that is
passed to policy.enforce.

Changed a check for whether an attribute is explicitly set.
Now, in the case of update, the function should not pay attention
to a default value of an attribute, but check whether it was
explicitly marked as being updated.

Added unit-tests.

Closes-Bug: #1357379
Related-Bug: #1338880
Change-Id: I6537bb1da5ef0d6899bc71e4e949f2c760c103c2
This commit is contained in:
Elena Ezhova 2014-08-19 15:54:36 +04:00
parent 92caa2eb6a
commit d841516fb5
5 changed files with 58 additions and 13 deletions

View File

@ -513,6 +513,10 @@ class Controller(object):
parent_id=parent_id)
orig_object_copy = copy.copy(orig_obj)
orig_obj.update(body[self._resource])
# Make a list of attributes to be updated to inform the policy engine
# which attributes are set explicitly so that it can distinguish them
# from the ones that are set to their default values.
orig_obj[const.ATTRIBUTES_TO_UPDATE] = body[self._resource].keys()
try:
policy.enforce(request.context,
action,

View File

@ -150,3 +150,5 @@ DEVICE_NAME_MAX_LEN = 15
# Device names start with "tap"
TAP_DEVICE_PREFIX = 'tap'
ATTRIBUTES_TO_UPDATE = 'attributes_to_update'

View File

@ -16,12 +16,15 @@
"""
Policy engine for neutron. Largely copied from nova.
"""
import collections
import itertools
import re
from oslo.config import cfg
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions
import neutron.common.utils as utils
from neutron.openstack.common import excutils
@ -118,14 +121,28 @@ def _set_rules(data):
policy.set_rules(policies)
def _is_attribute_explicitly_set(attribute_name, resource, target):
"""Verify that an attribute is present and has a non-default value."""
def _is_attribute_explicitly_set(attribute_name, resource, target, action):
"""Verify that an attribute is present and is explicitly set."""
if 'update' in action:
# In the case of update, the function should not pay attention to a
# default value of an attribute, but check whether it was explicitly
# marked as being updated instead.
return (attribute_name in target[const.ATTRIBUTES_TO_UPDATE] and
target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED)
return ('default' in resource[attribute_name] and
attribute_name in target and
target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED and
target[attribute_name] != resource[attribute_name]['default'])
def _should_validate_sub_attributes(attribute, sub_attr):
"""Verify that sub-attributes are iterable and should be validated."""
validate = attribute.get('validate')
return (validate and isinstance(sub_attr, collections.Iterable) and
any([k.startswith('type:dict') and
v for (k, v) in validate.iteritems()]))
def _build_subattr_match_rule(attr_name, attr, action, target):
"""Create the rule to match for sub-attribute policy checks."""
# TODO(salv-orlando): Instead of relying on validator info, introduce
@ -173,16 +190,14 @@ def _build_match_rule(action, target):
for attribute_name in res_map[resource]:
if _is_attribute_explicitly_set(attribute_name,
res_map[resource],
target):
target, action):
attribute = res_map[resource][attribute_name]
if 'enforce_policy' in attribute:
attr_rule = policy.RuleCheck('rule', '%s:%s' %
(action, attribute_name))
# Build match entries for sub-attributes, if present
validate = attribute.get('validate')
if (validate and any([k.startswith('type:dict') and v
for (k, v) in
validate.iteritems()])):
# Build match entries for sub-attributes
if _should_validate_sub_attributes(
attribute, target[attribute_name]):
attr_rule = policy.AndCheck(
[attr_rule, _build_subattr_match_rule(
attribute_name, attribute,

View File

@ -1207,6 +1207,18 @@ class SubresourceTest(base.BaseTestCase, testlib_plugin.PluginSetupHelper):
network_id='id1',
dummy=body)
def test_update_subresource_to_none(self):
instance = self.plugin.return_value
dummy_id = _uuid()
body = {'dummy': {}}
self.api.put_json('/networks/id1' + _get_path('dummies', id=dummy_id),
body)
instance.update_network_dummy.assert_called_once_with(mock.ANY,
dummy_id,
network_id='id1',
dummy=body)
def test_delete_sub_resource(self):
instance = self.plugin.return_value

View File

@ -23,6 +23,7 @@ import six
import neutron
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions
from neutron import context
from neutron import manager
@ -282,9 +283,11 @@ class NeutronPolicyTestCase(base.BaseTestCase):
fake_manager_instance.plugin = plugin_klass()
def _test_action_on_attr(self, context, action, attr, value,
exception=None):
exception=None, **kwargs):
action = "%s_network" % action
target = {'tenant_id': 'the_owner', attr: value}
if kwargs:
target.update(kwargs)
if exception:
self.assertRaises(exception, policy.enforce,
context, action, target)
@ -293,10 +296,10 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self.assertEqual(result, True)
def _test_nonadmin_action_on_attr(self, action, attr, value,
exception=None):
exception=None, **kwargs):
user_context = context.Context('', "user", roles=['user'])
self._test_action_on_attr(user_context, action, attr,
value, exception)
value, exception, **kwargs)
def test_nonadmin_write_on_private_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', False,
@ -313,9 +316,11 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_nonadmin_read_on_shared_succeeds(self):
self._test_nonadmin_action_on_attr('get', 'shared', True)
def _test_enforce_adminonly_attribute(self, action):
def _test_enforce_adminonly_attribute(self, action, **kwargs):
admin_context = context.get_admin_context()
target = {'shared': True}
if kwargs:
target.update(kwargs)
result = policy.enforce(admin_context, action, target)
self.assertEqual(result, True)
@ -323,7 +328,14 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self._test_enforce_adminonly_attribute('create_network')
def test_enforce_adminonly_attribute_update(self):
self._test_enforce_adminonly_attribute('update_network')
kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_enforce_adminonly_attribute('update_network', **kwargs)
def test_reset_adminonly_attr_to_default_fails(self):
kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_nonadmin_action_on_attr('update', 'shared', False,
exceptions.PolicyNotAuthorized,
**kwargs)
def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
del self.rules[policy.ADMIN_CTX_POLICY]