Updated policy module from oslo-incubator

Common policy has not been synced with oslo-incubator for a
long time and is seriously outdated.

This change pulls in fresh code from oslo-incubator which
introduces the Enforcer class to replace the old check function.

Rewrite neutron.policy using naming conventions and approach
that was set in Nova and amend related unit tests.
Remove neutron.common.exceptions.PolicyNotAuthorized and switch
to neutron.openstack.common.policy.PolicyNotAuthorized.
Drop Neutron specific policy_file option since now it is defined
in oslo-incubator policy module.

Change log:
4ca5091 Fixes nits in module policy
262fc82 Correct default rule name for policy.Enforcer
9e8b9f6 Minor fixes in policy module
6c706c5 Delete graduated serialization files
5d40e14 Remove code that moved to oslo.i18n
aebb58f Fix typo to show correct log message
bb410d9 Use MultiStrOpt for policy_dirs
33f44bf Add support for policy configration directories
2b966f9 Fix deletion of cached file for policy enforcer
238e601 Make policy debug logging less verbose
fe3389e Improve help strings
15722f1 Adds a flag to determine whether to reload the rules in policy
5d1f15a Documenting policy.json syntax
fcf517d Update oslo log messages with translation domains
e038d89 Fix policy tests for parallel testing
0da5de6 Allow policy.json resource vs constant check
e4b2334 Replaces use of urlutils with six in policy module
8b2b0b7 Use hacking import_exceptions for gettextutils._
0d8f18b Use urlutils functions instead of urllib/urllib2
12bcdb7 Remove vim header
9ef9fec Use six.string_type instead of basestring
4bfb7a2 Apply six for metaclass
1538c80 ConfigFileNotFoundError with proper argument
33533b0 Keystone user can't perform revoke_token
64bb5e2 Fix wrong argument in openstack common policy
b7edc99 Fix missing argument bug in oslo common policy
3626b6d Fix policy default_rule issue
7bf8ee9 Allow use of hacking 0.6.0 and enable new checks
e4ac367 Fix missing argument bug in oslo common policy
1a2df89 Enable H302 hacking check
7119e29 Enable hacking H404 test.
6d27681 Enable H306 hacking check.
1091b4f Reduce duplicated code related to policies

Closes-Bug: #1288178
Change-Id: I87ee30e2b64ec6b07faa84a231fd5f7eb925d501
This commit is contained in:
Elena Ezhova 2014-09-18 11:53:24 +04:00
parent b590218598
commit ad6bfc1ab6
9 changed files with 484 additions and 350 deletions

View File

@ -28,6 +28,7 @@ from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import policy as common_policy
from neutron import policy
from neutron import quota
@ -41,6 +42,7 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound,
exceptions.ServiceUnavailable: webob.exc.HTTPServiceUnavailable,
exceptions.NotAuthorized: webob.exc.HTTPForbidden,
netaddr.AddrFormatError: webob.exc.HTTPBadRequest,
common_policy.PolicyNotAuthorized: webob.exc.HTTPForbidden
}
@ -187,7 +189,7 @@ class Controller(object):
# Fetch the resource and verify if the user can access it
try:
resource = self._item(request, id, True)
except exceptions.PolicyNotAuthorized:
except common_policy.PolicyNotAuthorized:
msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg)
body = kwargs.pop('body', None)
@ -326,7 +328,7 @@ class Controller(object):
field_list=field_list,
parent_id=parent_id),
fields_to_strip=added_fields)}
except exceptions.PolicyNotAuthorized:
except common_policy.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
msg = _('The resource could not be found.')
@ -466,7 +468,7 @@ class Controller(object):
policy.enforce(request.context,
action,
obj)
except exceptions.PolicyNotAuthorized:
except common_policy.PolicyNotAuthorized:
# To avoid giving away information, pretend that it
# doesn't exist
msg = _('The resource could not be found.')
@ -521,7 +523,7 @@ class Controller(object):
policy.enforce(request.context,
action,
orig_obj)
except exceptions.PolicyNotAuthorized:
except common_policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception() as ctxt:
# If a tenant is modifying it's own object, it's safe to return
# a 403. Otherwise, pretend that it doesn't exist to avoid

View File

@ -27,6 +27,7 @@ import webob.exc
from neutron.common import exceptions
from neutron.openstack.common import gettextutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import policy as common_policy
from neutron import wsgi
@ -80,7 +81,8 @@ def Resource(controller, faults=None, deserializers=None, serializers=None):
result = method(request=request, **args)
except (exceptions.NeutronException,
netaddr.AddrFormatError) as e:
netaddr.AddrFormatError,
common_policy.PolicyNotAuthorized) as e:
for fault in faults:
if isinstance(e, fault):
mapped_exc = faults[fault]

View File

@ -41,8 +41,6 @@ core_opts = [
help=_("The API paste config file to use")),
cfg.StrOpt('api_extensions_path', default="",
help=_("The path for API extensions")),
cfg.StrOpt('policy_file', default="policy.json",
help=_("The policy file to use")),
cfg.StrOpt('auth_strategy', default='keystone',
help=_("The type of authentication to use")),
cfg.StrOpt('core_plugin',

View File

@ -71,10 +71,6 @@ class AdminRequired(NotAuthorized):
message = _("User does not have admin privileges: %(reason)s")
class PolicyNotAuthorized(NotAuthorized):
message = _("Policy doesn't allow %(action)s to be performed.")
class NetworkNotFound(NotFound):
message = _("Network %(net_id)s could not be found")

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
@ -48,6 +46,27 @@ policy rule::
project_id:%(project_id)s and not role:dunce
It is possible to perform policy checks on the following user
attributes (obtained through the token): user_id, domain_id or
project_id::
domain_id:<some_value>
Attributes sent along with API calls can be used by the policy engine
(on the right side of the expression), by using the following syntax::
<some_value>:user.id
Contextual attributes of objects identified by their IDs are loaded
from the database. They are also available to the policy engine and
can be checked through the `target` keyword::
<some_value>:target.role.name
All these attributes (related to users, API calls, and context) can be
checked against each other or against constants, be it literals (True,
<a_number>) or strings.
Finally, two special policy checks should be mentioned; the policy
check "@" will always accept an access, and the policy check "!" will
always reject an access. (Note that if a rule is either the empty
@ -57,34 +76,56 @@ as it allows particular rules to be explicitly disabled.
"""
import abc
import ast
import os
import re
import urllib
from oslo.config import cfg
from oslo.serialization import jsonutils
import six
import urllib2
import six.moves.urllib.parse as urlparse
import six.moves.urllib.request as urlrequest
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import jsonutils
from neutron.openstack.common import fileutils
from neutron.openstack.common._i18n import _, _LE, _LW
from neutron.openstack.common import log as logging
policy_opts = [
cfg.StrOpt('policy_file',
default='policy.json',
help=_('The JSON file that defines policies.')),
cfg.StrOpt('policy_default_rule',
default='default',
help=_('Default rule. Enforced when a requested rule is not '
'found.')),
cfg.MultiStrOpt('policy_dirs',
default=['policy.d'],
help=_('Directories where policy configuration files are '
'stored.')),
]
CONF = cfg.CONF
CONF.register_opts(policy_opts)
LOG = logging.getLogger(__name__)
_rules = None
_checks = {}
class PolicyNotAuthorized(Exception):
def __init__(self, rule):
msg = _("Policy doesn't allow %s to be performed.") % rule
super(PolicyNotAuthorized, self).__init__(msg)
class Rules(dict):
"""
A store for rules. Handles the default_rule setting directly.
"""
"""A store for rules. Handles the default_rule setting directly."""
@classmethod
def load_json(cls, data, default_rule=None):
"""
Allow loading of JSON rule data.
"""
"""Allow loading of JSON rule data."""
# Suck in the JSON data and parse the rules
rules = dict((k, parse_rule(v)) for k, v in
@ -101,11 +142,22 @@ class Rules(dict):
def __missing__(self, key):
"""Implements the default rule handling."""
# If the default rule isn't actually defined, do something
# reasonably intelligent
if not self.default_rule or self.default_rule not in self:
if isinstance(self.default_rule, dict):
raise KeyError(key)
# If the default rule isn't actually defined, do something
# reasonably intelligent
if not self.default_rule:
raise KeyError(key)
if isinstance(self.default_rule, BaseCheck):
return self.default_rule
# We need to check this or we can get infinite recursion
if self.default_rule not in self:
raise KeyError(key)
elif isinstance(self.default_rule, six.string_types):
return self[self.default_rule]
def __str__(self):
@ -124,38 +176,133 @@ class Rules(dict):
return jsonutils.dumps(out_rules, indent=4)
# Really have to figure out a way to deprecate this
def set_rules(rules):
"""Set the rules in use for policy checks."""
class Enforcer(object):
"""Responsible for loading and enforcing rules.
global _rules
_rules = rules
# Ditto
def reset():
"""Clear the rules used for policy checks."""
global _rules
_rules = None
def check(rule, target, creds, exc=None, *args, **kwargs):
:param policy_file: Custom policy file to use, if none is
specified, `CONF.policy_file` will be
used.
:param rules: Default dictionary / Rules to use. It will be
considered just in the first instantiation. If
`load_rules(True)`, `clear()` or `set_rules(True)`
is called this will be overwritten.
:param default_rule: Default rule to use, CONF.default_rule will
be used if none is specified.
:param use_conf: Whether to load rules from cache or config file.
"""
Checks authorization of a rule against the target and credentials.
:param rule: The rule to evaluate.
def __init__(self, policy_file=None, rules=None,
default_rule=None, use_conf=True):
self.default_rule = default_rule or CONF.policy_default_rule
self.rules = Rules(rules, self.default_rule)
self.policy_path = None
self.policy_file = policy_file or CONF.policy_file
self.use_conf = use_conf
def set_rules(self, rules, overwrite=True, use_conf=False):
"""Create a new Rules object based on the provided dict of rules.
:param rules: New rules to use. It should be an instance of dict.
:param overwrite: Whether to overwrite current rules or update them
with the new rules.
:param use_conf: Whether to reload rules from cache or config file.
"""
if not isinstance(rules, dict):
raise TypeError(_("Rules must be an instance of dict or Rules, "
"got %s instead") % type(rules))
self.use_conf = use_conf
if overwrite:
self.rules = Rules(rules, self.default_rule)
else:
self.rules.update(rules)
def clear(self):
"""Clears Enforcer rules, policy's cache and policy's path."""
self.set_rules({})
fileutils.delete_cached_file(self.policy_path)
self.default_rule = None
self.policy_path = None
def load_rules(self, force_reload=False):
"""Loads policy_path's rules.
Policy file is cached and will be reloaded if modified.
:param force_reload: Whether to overwrite current rules.
"""
if force_reload:
self.use_conf = force_reload
if self.use_conf:
if not self.policy_path:
self.policy_path = self._get_policy_path(self.policy_file)
self._load_policy_file(self.policy_path, force_reload)
for path in CONF.policy_dirs:
try:
path = self._get_policy_path(path)
except cfg.ConfigFilesNotFoundError:
LOG.warn(_LW("Can not find policy directory: %s"), path)
continue
self._walk_through_policy_directory(path,
self._load_policy_file,
force_reload, False)
def _walk_through_policy_directory(self, path, func, *args):
# We do not iterate over sub-directories.
policy_files = next(os.walk(path))[2]
policy_files.sort()
for policy_file in [p for p in policy_files if not p.startswith('.')]:
func(os.path.join(path, policy_file), *args)
def _load_policy_file(self, path, force_reload, overwrite=True):
reloaded, data = fileutils.read_cached_file(
path, force_reload=force_reload)
if reloaded or not self.rules:
rules = Rules.load_json(data, self.default_rule)
self.set_rules(rules, overwrite)
LOG.debug("Rules successfully reloaded")
def _get_policy_path(self, path):
"""Locate the policy json data file/path.
:param path: It's value can be a full path or related path. When
full path specified, this function just returns the full
path. When related path specified, this function will
search configuration directories to find one that exists.
:returns: The policy path
:raises: ConfigFilesNotFoundError if the file/path couldn't
be located.
"""
policy_path = CONF.find_file(path)
if policy_path:
return policy_path
raise cfg.ConfigFilesNotFoundError((path,))
def enforce(self, rule, target, creds, do_raise=False,
exc=None, *args, **kwargs):
"""Checks authorization of a rule against the target and credentials.
:param rule: A string or BaseCheck instance specifying the rule
to evaluate.
:param target: As much information about the object being operated
on as possible, as a dictionary.
:param creds: As much information about the user performing the
action as possible, as a dictionary.
:param do_raise: Whether to raise an exception or not if check
fails.
:param exc: Class of the exception to raise if the check fails.
Any remaining arguments passed to check() (both
Any remaining arguments passed to enforce() (both
positional and keyword arguments) will be passed to
the exception class. If exc is not provided, returns
False.
the exception class. If not specified, PolicyNotAuthorized
will be used.
:return: Returns False if the policy does not allow the action and
exc is not provided; otherwise, returns a value that
@ -164,47 +311,48 @@ def check(rule, target, creds, exc=None, *args, **kwargs):
from the expression.
"""
self.load_rules()
# Allow the rule to be a Check tree
if isinstance(rule, BaseCheck):
result = rule(target, creds)
elif not _rules:
result = rule(target, creds, self)
elif not self.rules:
# No rules to reference means we're going to fail closed
result = False
else:
try:
# Evaluate the rule
result = _rules[rule](target, creds)
result = self.rules[rule](target, creds, self)
except KeyError:
LOG.debug("Rule [%s] doesn't exist" % rule)
# If the rule doesn't exist, fail closed
result = False
# If it is False, raise the exception if requested
if exc and result is False:
if do_raise and not result:
if exc:
raise exc(*args, **kwargs)
raise PolicyNotAuthorized(rule)
return result
@six.add_metaclass(abc.ABCMeta)
class BaseCheck(object):
"""
Abstract base class for Check classes.
"""
__metaclass__ = abc.ABCMeta
"""Abstract base class for Check classes."""
@abc.abstractmethod
def __str__(self):
"""
Retrieve a string representation of the Check tree rooted at
this node.
"""
"""String representation of the Check tree rooted at this node."""
pass
@abc.abstractmethod
def __call__(self, target, cred):
"""
Perform the check. Returns False to reject the access or a
def __call__(self, target, cred, enforcer):
"""Triggers if instance of the class is called.
Performs the check. Returns False to reject the access or a
true value (not necessary True) to accept the access.
"""
@ -212,44 +360,39 @@ class BaseCheck(object):
class FalseCheck(BaseCheck):
"""
A policy check that always returns False (disallow).
"""
"""A policy check that always returns False (disallow)."""
def __str__(self):
"""Return a string representation of this check."""
return "!"
def __call__(self, target, cred):
def __call__(self, target, cred, enforcer):
"""Check the policy."""
return False
class TrueCheck(BaseCheck):
"""
A policy check that always returns True (allow).
"""
"""A policy check that always returns True (allow)."""
def __str__(self):
"""Return a string representation of this check."""
return "@"
def __call__(self, target, cred):
def __call__(self, target, cred, enforcer):
"""Check the policy."""
return True
class Check(BaseCheck):
"""
A base class to allow for user-defined policy checks.
"""
"""A base class to allow for user-defined policy checks."""
def __init__(self, kind, match):
"""
"""Initiates Check instance.
:param kind: The kind of the check, i.e., the field before the
':'.
:param match: The match of the check, i.e., the field after
@ -266,14 +409,13 @@ class Check(BaseCheck):
class NotCheck(BaseCheck):
"""
"""Implements the "not" logical operator.
A policy check that inverts the result of another policy check.
Implements the "not" operator.
"""
def __init__(self, rule):
"""
Initialize the 'not' check.
"""Initialize the 'not' check.
:param rule: The rule to negate. Must be a Check.
"""
@ -285,24 +427,23 @@ class NotCheck(BaseCheck):
return "not %s" % self.rule
def __call__(self, target, cred):
"""
Check the policy. Returns the logical inverse of the wrapped
check.
def __call__(self, target, cred, enforcer):
"""Check the policy.
Returns the logical inverse of the wrapped check.
"""
return not self.rule(target, cred)
return not self.rule(target, cred, enforcer)
class AndCheck(BaseCheck):
"""
A policy check that requires that a list of other checks all
return True. Implements the "and" operator.
"""Implements the "and" logical operator.
A policy check that requires that a list of other checks all return True.
"""
def __init__(self, rules):
"""
Initialize the 'and' check.
"""Initialize the 'and' check.
:param rules: A list of rules that will be tested.
"""
@ -314,20 +455,21 @@ class AndCheck(BaseCheck):
return "(%s)" % ' and '.join(str(r) for r in self.rules)
def __call__(self, target, cred):
"""
Check the policy. Requires that all rules accept in order to
return True.
def __call__(self, target, cred, enforcer):
"""Check the policy.
Requires that all rules accept in order to return True.
"""
for rule in self.rules:
if not rule(target, cred):
if not rule(target, cred, enforcer):
return False
return True
def add_check(self, rule):
"""
"""Adds rule to be tested.
Allows addition of another rule to the list of rules that will
be tested. Returns the AndCheck object for convenience.
"""
@ -337,14 +479,14 @@ class AndCheck(BaseCheck):
class OrCheck(BaseCheck):
"""
"""Implements the "or" operator.
A policy check that requires that at least one of a list of other
checks returns True. Implements the "or" operator.
checks returns True.
"""
def __init__(self, rules):
"""
Initialize the 'or' check.
"""Initialize the 'or' check.
:param rules: A list of rules that will be tested.
"""
@ -356,20 +498,20 @@ class OrCheck(BaseCheck):
return "(%s)" % ' or '.join(str(r) for r in self.rules)
def __call__(self, target, cred):
"""
Check the policy. Requires that at least one rule accept in
order to return True.
def __call__(self, target, cred, enforcer):
"""Check the policy.
Requires that at least one rule accept in order to return True.
"""
for rule in self.rules:
if rule(target, cred):
if rule(target, cred, enforcer):
return True
return False
def add_check(self, rule):
"""
"""Adds rule to be tested.
Allows addition of another rule to the list of rules that will
be tested. Returns the OrCheck object for convenience.
"""
@ -379,9 +521,7 @@ class OrCheck(BaseCheck):
def _parse_check(rule):
"""
Parse a single base check rule into an appropriate Check object.
"""
"""Parse a single base check rule into an appropriate Check object."""
# Handle the special checks
if rule == '!':
@ -392,7 +532,7 @@ def _parse_check(rule):
try:
kind, match = rule.split(':', 1)
except Exception:
LOG.exception(_("Failed to understand rule %(rule)s") % locals())
LOG.exception(_LE("Failed to understand rule %s") % rule)
# If the rule is invalid, we'll fail closed
return FalseCheck()
@ -402,14 +542,14 @@ def _parse_check(rule):
elif None in _checks:
return _checks[None](kind, match)
else:
LOG.error(_("No handler for matches of kind %s") % kind)
LOG.error(_LE("No handler for matches of kind %s") % kind)
return FalseCheck()
def _parse_list_rule(rule):
"""
Provided for backwards compatibility. Translates the old
list-of-lists syntax into a tree of Check objects.
"""Translates the old list-of-lists syntax into a tree of Check objects.
Provided for backwards compatibility.
"""
# Empty rule defaults to True
@ -424,7 +564,7 @@ def _parse_list_rule(rule):
continue
# Handle bare strings
if isinstance(inner_rule, basestring):
if isinstance(inner_rule, six.string_types):
inner_rule = [inner_rule]
# Parse the inner rules into Check objects
@ -450,8 +590,7 @@ _tokenize_re = re.compile(r'\s+')
def _parse_tokenize(rule):
"""
Tokenizer for the policy language.
"""Tokenizer for the policy language.
Most of the single-character tokens are specified in the
_tokenize_re; however, parentheses need to be handled specially,
@ -500,16 +639,16 @@ def _parse_tokenize(rule):
class ParseStateMeta(type):
"""
Metaclass for the ParseState class. Facilitates identifying
reduction methods.
"""Metaclass for the ParseState class.
Facilitates identifying reduction methods.
"""
def __new__(mcs, name, bases, cls_dict):
"""
Create the class. Injects the 'reducers' list, a list of
tuples matching token sequences to the names of the
corresponding reduction methods.
"""Create the class.
Injects the 'reducers' list, a list of tuples matching token sequences
to the names of the corresponding reduction methods.
"""
reducers = []
@ -526,10 +665,10 @@ class ParseStateMeta(type):
def reducer(*tokens):
"""
Decorator for reduction methods. Arguments are a sequence of
tokens, in order, which should trigger running this reduction
method.
"""Decorator for reduction methods.
Arguments are a sequence of tokens, in order, which should trigger running
this reduction method.
"""
def decorator(func):
@ -545,11 +684,12 @@ def reducer(*tokens):
return decorator
@six.add_metaclass(ParseStateMeta)
class ParseState(object):
"""
Implement the core of parsing the policy language. Uses a greedy
reduction algorithm to reduce a sequence of tokens into a single
terminal, the value of which will be the root of the Check tree.
"""Implement the core of parsing the policy language.
Uses a greedy reduction algorithm to reduce a sequence of tokens into
a single terminal, the value of which will be the root of the Check tree.
Note: error reporting is rather lacking. The best we can get with
this parser formulation is an overall "parse failed" error.
@ -557,8 +697,6 @@ class ParseState(object):
shouldn't be that big a problem.
"""
__metaclass__ = ParseStateMeta
def __init__(self):
"""Initialize the ParseState."""
@ -566,11 +704,11 @@ class ParseState(object):
self.values = []
def reduce(self):
"""
Perform a greedy reduction of the token stream. If a reducer
method matches, it will be executed, then the reduce() method
will be called recursively to search for any more possible
reductions.
"""Perform a greedy reduction of the token stream.
If a reducer method matches, it will be executed, then the
reduce() method will be called recursively to search for any more
possible reductions.
"""
for reduction, methname in self.reducers:
@ -600,9 +738,9 @@ class ParseState(object):
@property
def result(self):
"""
Obtain the final result of the parse. Raises ValueError if
the parse failed to reduce to a single result.
"""Obtain the final result of the parse.
Raises ValueError if the parse failed to reduce to a single result.
"""
if len(self.values) != 1:
@ -619,35 +757,31 @@ class ParseState(object):
@reducer('check', 'and', 'check')
def _make_and_expr(self, check1, _and, check2):
"""
Create an 'and_expr' from two checks joined by the 'and'
operator.
"""Create an 'and_expr'.
Join two checks by the 'and' operator.
"""
return [('and_expr', AndCheck([check1, check2]))]
@reducer('and_expr', 'and', 'check')
def _extend_and_expr(self, and_expr, _and, check):
"""
Extend an 'and_expr' by adding one more check.
"""
"""Extend an 'and_expr' by adding one more check."""
return [('and_expr', and_expr.add_check(check))]
@reducer('check', 'or', 'check')
def _make_or_expr(self, check1, _or, check2):
"""
Create an 'or_expr' from two checks joined by the 'or'
operator.
"""Create an 'or_expr'.
Join two checks by the 'or' operator.
"""
return [('or_expr', OrCheck([check1, check2]))]
@reducer('or_expr', 'or', 'check')
def _extend_or_expr(self, or_expr, _or, check):
"""
Extend an 'or_expr' by adding one more check.
"""
"""Extend an 'or_expr' by adding one more check."""
return [('or_expr', or_expr.add_check(check))]
@ -659,7 +793,8 @@ class ParseState(object):
def _parse_text_rule(rule):
"""
"""Parses policy to the tree.
Translates a policy written in the policy language into a tree of
Check objects.
"""
@ -677,26 +812,23 @@ def _parse_text_rule(rule):
return state.result
except ValueError:
# Couldn't parse the rule
LOG.exception(_("Failed to understand rule %(rule)r") % locals())
LOG.exception(_LE("Failed to understand rule %s") % rule)
# Fail closed
return FalseCheck()
def parse_rule(rule):
"""
Parses a policy rule into a tree of Check objects.
"""
"""Parses a policy rule into a tree of Check objects."""
# If the rule is a string, it's in the policy language
if isinstance(rule, basestring):
if isinstance(rule, six.string_types):
return _parse_text_rule(rule)
return _parse_list_rule(rule)
def register(name, func=None):
"""
Register a function or Check class as a policy check.
"""Register a function or Check class as a policy check.
:param name: Gives the name of the check type, e.g., 'rule',
'role', etc. If name is None, a default check type
@ -723,13 +855,11 @@ def register(name, func=None):
@register("rule")
class RuleCheck(Check):
def __call__(self, target, creds):
"""
Recursively checks credentials based on the defined rules.
"""
def __call__(self, target, creds, enforcer):
"""Recursively checks credentials based on the defined rules."""
try:
return _rules[self.match](target, creds)
return enforcer.rules[self.match](target, creds, enforcer)
except KeyError:
# We don't have any matching rule; fail closed
return False
@ -737,7 +867,7 @@ class RuleCheck(Check):
@register("role")
class RoleCheck(Check):
def __call__(self, target, creds):
def __call__(self, target, creds, enforcer):
"""Check that there is a matching role in the cred dict."""
return self.match.lower() in [x.lower() for x in creds['roles']]
@ -745,9 +875,8 @@ class RoleCheck(Check):
@register('http')
class HttpCheck(Check):
def __call__(self, target, creds):
"""
Check http: rules by calling to a remote server.
def __call__(self, target, creds, enforcer):
"""Check http: rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly 'True'.
@ -756,25 +885,40 @@ class HttpCheck(Check):
url = ('http:' + self.match) % target
data = {'target': jsonutils.dumps(target),
'credentials': jsonutils.dumps(creds)}
post_data = urllib.urlencode(data)
f = urllib2.urlopen(url, post_data)
post_data = urlparse.urlencode(data)
f = urlrequest.urlopen(url, post_data)
return f.read() == "True"
@register(None)
class GenericCheck(Check):
def __call__(self, target, creds):
"""
Check an individual match.
def __call__(self, target, creds, enforcer):
"""Check an individual match.
Matches look like:
tenant:%(tenant_id)s
role:compute:admin
True:%(user.enabled)s
'Member':%(role.name)s
"""
# TODO(termie): do dict inspection via dot syntax
try:
match = self.match % target
if self.kind in creds:
return match == six.text_type(creds[self.kind])
except KeyError:
# While doing GenericCheck if key not
# present in Target return false
return False
try:
# Try to interpret self.kind as a literal
leftval = ast.literal_eval(self.kind)
except ValueError:
try:
kind_parts = self.kind.split('.')
leftval = creds
for kind_part in kind_parts:
leftval = leftval[kind_part]
except KeyError:
return False
return match == six.text_type(leftval)

View File

@ -36,8 +36,8 @@ from neutron.openstack.common import policy
LOG = log.getLogger(__name__)
_POLICY_PATH = None
_POLICY_CACHE = {}
_ENFORCER = None
ADMIN_CTX_POLICY = 'context_is_admin'
ADVSVC_CTX_POLICY = 'context_is_advsvc'
# Maps deprecated 'extension' policies to new-style policies
@ -57,28 +57,32 @@ DEPRECATED_ACTION_MAP = {
'set': ['create', 'update']
}
cfg.CONF.import_opt('policy_file', 'neutron.common.config')
def reset():
global _POLICY_PATH
global _POLICY_CACHE
_POLICY_PATH = None
_POLICY_CACHE = {}
policy.reset()
global _ENFORCER
if _ENFORCER:
_ENFORCER.clear()
_ENFORCER = None
def init():
global _POLICY_PATH
global _POLICY_CACHE
if not _POLICY_PATH:
_POLICY_PATH = utils.find_config_file({}, cfg.CONF.policy_file)
if not _POLICY_PATH:
raise exceptions.PolicyFileNotFound(path=cfg.CONF.policy_file)
# pass _set_brain to read_cached_file so that the policy brain
# is reset only if the file has changed
utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
reload_func=_set_rules)
"""Init an instance of the Enforcer class."""
global _ENFORCER
if not _ENFORCER:
_ENFORCER = policy.Enforcer()
# NOTE: Method _get_policy_path in common.policy can not always locate
# neutron policy file (when init() is called in tests),
# so set it explicitly.
_ENFORCER.policy_path = utils.find_config_file({},
cfg.CONF.policy_file)
_ENFORCER.load_rules(True)
def refresh():
"""Reset policy and init a new instance of Enforcer."""
reset()
init()
def get_resource_and_action(action):
@ -87,12 +91,17 @@ def get_resource_and_action(action):
return ("%ss" % data[-1], data[0] != 'get')
def _set_rules(data):
default_rule = 'default'
LOG.debug(_("Loading policies from file: %s"), _POLICY_PATH)
def set_rules(policies, overwrite=True):
"""Set rules based on the provided dict of rules.
:param policies: New policies to use. It should be an instance of dict.
:param overwrite: Whether to overwrite current rules or update them
with the new rules.
"""
LOG.debug("Loading policies from file: %s", _ENFORCER.policy_path)
# Ensure backward compatibility with folsom/grizzly convention
# for extension rules
policies = policy.Rules.load_json(data, default_rule)
for pol in policies.keys():
if any([pol.startswith(depr_pol) for depr_pol in
DEPRECATED_POLICY_MAP.keys()]):
@ -120,7 +129,8 @@ def _set_rules(data):
LOG.error(_LE("Backward compatibility unavailable for "
"deprecated policy %s. The policy will "
"not be enforced"), pol)
policy.set_rules(policies)
init()
_ENFORCER.set_rules(policies, overwrite)
def _is_attribute_explicitly_set(attribute_name, resource, target, action):
@ -253,7 +263,7 @@ class OwnerCheck(policy.Check):
reason=err_reason)
super(OwnerCheck, self).__init__(kind, match)
def __call__(self, target, creds):
def __call__(self, target, creds, enforcer):
if self.target_field not in target:
# policy needs a plugin check
# target field is in the form resource:field
@ -337,7 +347,7 @@ class FieldCheck(policy.Check):
self.field = field
self.value = conv_func(value)
def __call__(self, target_dict, cred_dict):
def __call__(self, target_dict, cred_dict, enforcer):
target_value = target_dict.get(self.field)
# target_value might be a boolean, explicitly compare with None
if target_value is None:
@ -376,9 +386,9 @@ def check(context, action, target, plugin=None, might_not_exist=False):
:return: Returns True if access is permitted else False.
"""
if might_not_exist and not (policy._rules and action in policy._rules):
if might_not_exist and not (_ENFORCER.rules and action in _ENFORCER.rules):
return True
return policy.check(*(_prepare_check(context, action, target)))
return _ENFORCER.enforce(*(_prepare_check(context, action, target)))
def enforce(context, action, target, plugin=None):
@ -393,14 +403,16 @@ def enforce(context, action, target, plugin=None):
:param plugin: currently unused and deprecated.
Kept for backward compatibility.
:raises neutron.exceptions.PolicyNotAuthorized: if verification fails.
:raises neutron.openstack.common.policy.PolicyNotAuthorized:
if verification fails.
"""
rule, target, credentials = _prepare_check(context, action, target)
result = policy.check(rule, target, credentials, action=action)
if not result:
LOG.debug(_("Failed policy check for '%s'"), action)
raise exceptions.PolicyNotAuthorized(action=action)
try:
result = _ENFORCER.enforce(rule, target, credentials,
action=action, do_raise=True)
except policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception():
LOG.debug("Failed policy check for '%s'", action)
return result
@ -412,9 +424,9 @@ def check_is_admin(context):
target = credentials
# Backward compatibility: if ADMIN_CTX_POLICY is not
# found, default to validating role:admin
admin_policy = (ADMIN_CTX_POLICY if ADMIN_CTX_POLICY in policy._rules
admin_policy = (ADMIN_CTX_POLICY if ADMIN_CTX_POLICY in _ENFORCER.rules
else 'role:admin')
return policy.check(admin_policy, target, credentials)
return _ENFORCER.enforce(admin_policy, target, credentials)
def check_is_advsvc(context):
@ -425,16 +437,16 @@ def check_is_advsvc(context):
target = credentials
# Backward compatibility: if ADVSVC_CTX_POLICY is not
# found, default to validating role:advsvc
advsvc_policy = (ADVSVC_CTX_POLICY in policy._rules
advsvc_policy = (ADVSVC_CTX_POLICY in _ENFORCER.rules
and ADVSVC_CTX_POLICY or 'role:advsvc')
return policy.check(advsvc_policy, target, credentials)
return _ENFORCER.enforce(advsvc_policy, target, credentials)
def _extract_roles(rule, roles):
if isinstance(rule, policy.RoleCheck):
roles.append(rule.match.lower())
elif isinstance(rule, policy.RuleCheck):
_extract_roles(policy._rules[rule.match], roles)
_extract_roles(_ENFORCER.rules[rule.match], roles)
elif hasattr(rule, 'rules'):
for rule in rule.rules:
_extract_roles(rule, roles)
@ -451,10 +463,10 @@ def get_admin_roles():
# plugin modules. For backward compatibility it returns the literal
# admin if ADMIN_CTX_POLICY is not defined
init()
if not policy._rules or ADMIN_CTX_POLICY not in policy._rules:
if not _ENFORCER.rules or ADMIN_CTX_POLICY not in _ENFORCER.rules:
return ['admin']
try:
admin_ctx_rule = policy._rules[ADMIN_CTX_POLICY]
admin_ctx_rule = _ENFORCER.rules[ADMIN_CTX_POLICY]
except (KeyError, TypeError):
return
roles = []

View File

@ -31,10 +31,10 @@ class TestL3Sync(test_nuage_plugin.NuagePluginV2TestCase,
extraroute_test.ExtraRouteDBIntTestCase):
def setUp(self):
self.session = context.get_admin_context().session
super(TestL3Sync, self).setUp()
self.syncmanager = sync.SyncManager(
test_nuage_plugin.getNuageClient())
super(TestL3Sync, self).setUp()
self.session = context.get_admin_context().session
def _make_floatingip_for_tenant_port(self, net_id, port_id, tenant_id):
data = {'floatingip': {'floating_network_id': net_id,
@ -194,10 +194,10 @@ class TestL3Sync(test_nuage_plugin.NuagePluginV2TestCase,
class TestNetPartSync(test_netpartition.NetPartitionTestCase):
def setUp(self):
self.session = context.get_admin_context().session
self.syncmanager = sync.SyncManager(
test_nuage_plugin.getNuageClient())
super(TestNetPartSync, self).setUp()
self.session = context.get_admin_context().session
def test_net_partition_sync(self):
# If the net-partition exists in neutron and not in VSD,
@ -225,10 +225,10 @@ class TestNetPartSync(test_netpartition.NetPartitionTestCase):
class TestL2Sync(test_nuage_plugin.NuagePluginV2TestCase):
def setUp(self):
self.session = context.get_admin_context().session
super(TestL2Sync, self).setUp()
self.syncmanager = sync.SyncManager(
test_nuage_plugin.getNuageClient())
super(TestL2Sync, self).setUp()
self.session = context.get_admin_context().session
def test_subnet_sync(self):
# If the subnet exists in neutron and not in VSD,
@ -274,10 +274,10 @@ class TestL2Sync(test_nuage_plugin.NuagePluginV2TestCase):
class TestSecurityGroupSync(test_sg.TestSecurityGroups):
def setUp(self):
self.session = context.get_admin_context().session
self.syncmanager = sync.SyncManager(
test_nuage_plugin.getNuageClient())
super(TestSecurityGroupSync, self).setUp()
self.session = context.get_admin_context().session
def test_sg_get(self):
with self.security_group() as sg:

View File

@ -1028,14 +1028,13 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
tenant_id = _uuid()
# Inject rule in policy engine
policy.init()
common_policy._rules['get_network:name'] = common_policy.parse_rule(
"rule:admin_only")
self.addCleanup(policy.reset)
rules = {'get_network:name': common_policy.parse_rule(
"rule:admin_only")}
policy.set_rules(rules, overwrite=False)
res = self._test_get(tenant_id, tenant_id, 200)
res = self.deserialize(res)
try:
self.assertNotIn('name', res['network'])
finally:
del common_policy._rules['get_network:name']
def _test_update(self, req_tenant_id, real_tenant_id, expected_code,
expect_errors=False):

View File

@ -15,11 +15,15 @@
"""Test of Policy Engine For Neutron"""
import StringIO
import urllib2
import fixtures
import mock
from oslo.config import cfg
from oslo.serialization import jsonutils
import six
import six.moves.urllib.request as urlrequest
import neutron
from neutron.api.v2 import attributes
@ -28,7 +32,6 @@ from neutron.common import exceptions
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import policy as common_policy
from neutron import policy
from neutron.tests import base
@ -37,33 +40,24 @@ from neutron.tests import base
class PolicyFileTestCase(base.BaseTestCase):
def setUp(self):
super(PolicyFileTestCase, self).setUp()
policy.reset()
self.addCleanup(policy.reset)
self.context = context.Context('fake', 'fake', is_admin=False)
self.target = {'tenant_id': 'fake'}
self.tempdir = self.useFixture(fixtures.TempDir())
def test_modified_policy_reloads(self):
def fake_find_config_file(_1, _2):
return self.tempdir.join('policy')
with mock.patch.object(neutron.common.utils,
'find_config_file',
new=fake_find_config_file):
tmpfilename = fake_find_config_file(None, None)
tmpfilename = self.tempdir.join('policy')
action = "example:test"
with open(tmpfilename, "w") as policyfile:
policyfile.write("""{"example:test": ""}""")
policy.init()
cfg.CONF.set_override('policy_file', tmpfilename)
policy.refresh()
policy.enforce(self.context, action, self.target)
with open(tmpfilename, "w") as policyfile:
policyfile.write("""{"example:test": "!"}""")
# NOTE(vish): reset stored policy cache so we don't have to
# sleep(1)
policy._POLICY_CACHE = {}
policy.init()
policy.refresh()
self.target = {'tenant_id': 'fake_tenant'}
self.assertRaises(exceptions.PolicyNotAuthorized,
self.assertRaises(common_policy.PolicyNotAuthorized,
policy.enforce,
self.context,
action,
@ -73,10 +67,8 @@ class PolicyFileTestCase(base.BaseTestCase):
class PolicyTestCase(base.BaseTestCase):
def setUp(self):
super(PolicyTestCase, self).setUp()
policy.reset()
self.addCleanup(policy.reset)
# NOTE(vish): preload rules to circumvent reloading from file
policy.init()
rules = {
"true": '@',
"example:allowed": '@',
@ -88,21 +80,21 @@ class PolicyTestCase(base.BaseTestCase):
"example:lowercase_admin": "role:admin or role:sysadmin",
"example:uppercase_admin": "role:ADMIN or role:sysadmin",
}
policy.refresh()
# NOTE(vish): then overload underlying rules
common_policy.set_rules(common_policy.Rules(
dict((k, common_policy.parse_rule(v))
for k, v in rules.items())))
policy.set_rules(dict((k, common_policy.parse_rule(v))
for k, v in rules.items()))
self.context = context.Context('fake', 'fake', roles=['member'])
self.target = {}
def test_enforce_nonexistent_action_throws(self):
action = "example:noexist"
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, self.target)
def test_enforce_bad_action_throws(self):
action = "example:denied"
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, self.target)
def test_check_bad_action_noraise(self):
@ -123,12 +115,9 @@ class PolicyTestCase(base.BaseTestCase):
result = policy.enforce(self.context, action, self.target)
self.assertEqual(result, True)
def test_enforce_http_true(self):
def fakeurlopen(url, post_data):
return six.StringIO("True")
with mock.patch.object(urllib2, 'urlopen', new=fakeurlopen):
@mock.patch.object(urlrequest, 'urlopen',
return_value=StringIO.StringIO("True"))
def test_enforce_http_true(self, mock_urlrequest):
action = "example:get_http"
target = {}
result = policy.enforce(self.context, action, target)
@ -142,20 +131,21 @@ class PolicyTestCase(base.BaseTestCase):
with mock.patch.object(urllib2, 'urlopen', new=fakeurlopen):
action = "example:get_http"
target = {}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, action, target)
self.assertRaises(common_policy.PolicyNotAuthorized,
policy.enforce, self.context,
action, target)
def test_templatized_enforcement(self):
target_mine = {'tenant_id': 'fake'}
target_not_mine = {'tenant_id': 'another'}
action = "example:my_file"
policy.enforce(self.context, action, target_mine)
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target_not_mine)
def test_early_AND_enforcement(self):
action = "example:early_and_fail"
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, self.target)
def test_early_OR_enforcement(self):
@ -176,37 +166,27 @@ class DefaultPolicyTestCase(base.BaseTestCase):
def setUp(self):
super(DefaultPolicyTestCase, self).setUp()
policy.reset()
policy.init()
self.addCleanup(policy.reset)
self.tempdir = self.useFixture(fixtures.TempDir())
tmpfilename = self.tempdir.join('policy.json')
self.rules = {
"default": '',
"example:exist": '!',
}
self._set_rules('default')
with open(tmpfilename, "w") as policyfile:
jsonutils.dump(self.rules, policyfile)
cfg.CONF.set_override('policy_file', tmpfilename)
policy.refresh()
self.addCleanup(policy.reset)
self.context = context.Context('fake', 'fake')
def _set_rules(self, default_rule):
rules = common_policy.Rules(
dict((k, common_policy.parse_rule(v))
for k, v in self.rules.items()), default_rule)
common_policy.set_rules(rules)
def test_policy_called(self):
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, "example:exist", {})
def test_not_found_policy_calls_default(self):
policy.enforce(self.context, "example:noexist", {})
def test_default_not_found(self):
self._set_rules("default_noexist")
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, "example:noexist", {})
FAKE_RESOURCE_NAME = 'something'
FAKE_RESOURCE = {"%ss" % FAKE_RESOURCE_NAME:
@ -225,8 +205,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def setUp(self):
super(NeutronPolicyTestCase, self).setUp()
policy.reset()
policy.init()
policy.refresh()
self.addCleanup(policy.reset)
self.admin_only_legacy = "role:admin"
self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
@ -251,6 +230,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
"update_network:shared": "rule:admin_only",
"get_network": "rule:admin_or_owner or rule:shared or "
"rule:external or rule:context_is_advsvc",
"create_subnet": "rule:admin_or_network_owner",
"create_port:mac": "rule:admin_or_network_owner or "
"rule:context_is_advsvc",
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
@ -267,8 +247,9 @@ class NeutronPolicyTestCase(base.BaseTestCase):
"rule:shared"
}.items())
def fakepolicyinit():
common_policy.set_rules(common_policy.Rules(self.rules))
def fakepolicyinit(**kwargs):
enf = policy._ENFORCER
enf.set_rules(common_policy.Rules(self.rules))
def remove_fake_resource():
del attributes.RESOURCE_ATTRIBUTE_MAP["%ss" % FAKE_RESOURCE_NAME]
@ -314,22 +295,22 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_nonadmin_write_on_private_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', False,
exceptions.PolicyNotAuthorized)
common_policy.PolicyNotAuthorized)
def test_nonadmin_read_on_private_fails(self):
self._test_nonadmin_action_on_attr('get', 'shared', False,
exceptions.PolicyNotAuthorized)
common_policy.PolicyNotAuthorized)
def test_nonadmin_write_on_shared_fails(self):
self._test_nonadmin_action_on_attr('create', 'shared', True,
exceptions.PolicyNotAuthorized)
common_policy.PolicyNotAuthorized)
def test_advsvc_get_network_works(self):
self._test_advsvc_action_on_attr('get', 'network', 'shared', False)
def test_advsvc_create_network_fails(self):
self._test_advsvc_action_on_attr('create', 'network', 'shared', False,
exceptions.PolicyNotAuthorized)
common_policy.PolicyNotAuthorized)
def test_advsvc_create_port_works(self):
self._test_advsvc_action_on_attr('create', 'port:mac', 'shared', False)
@ -347,7 +328,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_advsvc_create_subnet_fails(self):
self._test_advsvc_action_on_attr('create', 'subnet', 'shared', False,
exceptions.PolicyNotAuthorized)
common_policy.PolicyNotAuthorized)
def test_nonadmin_read_on_shared_succeeds(self):
self._test_nonadmin_action_on_attr('get', 'shared', True)
@ -370,7 +351,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_reset_adminonly_attr_to_default_fails(self):
kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_nonadmin_action_on_attr('update', 'shared', False,
exceptions.PolicyNotAuthorized,
common_policy.PolicyNotAuthorized,
**kwargs)
def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
@ -384,7 +365,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_enforce_adminonly_attribute_nonadminctx_returns_403(self):
action = "create_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target)
def test_enforce_adminonly_nonadminctx_no_ctx_is_admin_policy_403(self):
@ -395,7 +376,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self.admin_or_owner_legacy)
action = "create_network"
target = {'shared': True, 'tenant_id': 'somebody_else'}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target)
def _test_build_subattribute_match_rule(self, validate_value):
@ -436,7 +417,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
action = "create_something"
target = {'tenant_id': 'fake', 'attr': {'sub_attr_1': 'x',
'sub_attr_2': 'y'}}
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.assertRaises(common_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target, None)
def test_enforce_regularuser_on_read(self):
@ -536,7 +517,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"some_other_rule": "role:admin",
}.items())
common_policy.set_rules(common_policy.Rules(rules))
policy.set_rules(common_policy.Rules(rules))
# 'admin' role is expected for bw compatibility
self.assertEqual(['admin'], policy.get_admin_roles())
@ -544,7 +525,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
rules = dict((k, common_policy.parse_rule(v)) for k, v in {
policy.ADMIN_CTX_POLICY: "role:admin",
}.items())
common_policy.set_rules(common_policy.Rules(rules))
policy.set_rules(common_policy.Rules(rules))
self.assertEqual(['admin'], policy.get_admin_roles())
def test_get_roles_with_rule_check(self):
@ -552,7 +533,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
policy.ADMIN_CTX_POLICY: "rule:some_other_rule",
"some_other_rule": "role:admin",
}.items())
common_policy.set_rules(common_policy.Rules(rules))
policy.set_rules(common_policy.Rules(rules))
self.assertEqual(['admin'], policy.get_admin_roles())
def test_get_roles_with_or_check(self):
@ -572,15 +553,15 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def _test_set_rules_with_deprecated_policy(self, input_rules,
expected_rules):
policy._set_rules(jsonutils.dumps(input_rules))
policy.set_rules(input_rules.copy())
# verify deprecated policy has been removed
for pol in input_rules.keys():
self.assertNotIn(pol, common_policy._rules)
self.assertNotIn(pol, policy._ENFORCER.rules)
# verify deprecated policy was correctly translated. Iterate
# over items for compatibility with unittest2 in python 2.6
for rule in expected_rules:
self.assertIn(rule, common_policy._rules)
self.assertEqual(str(common_policy._rules[rule]),
self.assertIn(rule, policy._ENFORCER.rules)
self.assertEqual(str(policy._ENFORCER.rules[rule]),
expected_rules[rule])
def test_set_rules_with_deprecated_view_policy(self):