Make use of private modules

Move the parser and checks logic into oslo_policy._parser and
oslo_policy._checks respectively. As a consequence, this allows us to
create separate test files for those modules so we now also have
oslo_policy.tests.test_parser and oslo_policy.tests.test_checks. Since
those modules needed some common classes and fixtures it was also
necessary to add oslo_policy.tests.base to service the three test
modules.

Change-Id: I656dcb8fda7b953f5def8ddfaa4d119a8c881965
This commit is contained in:
Ian Cordasco 2015-02-05 11:45:49 -06:00 committed by Steve Martinelli
parent c2872aa32f
commit df5d80759e
7 changed files with 1534 additions and 1411 deletions

310
oslo_policy/_checks.py Normal file
View File

@ -0,0 +1,310 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import ast
import copy
from oslo_serialization import jsonutils
import six
import six.moves.urllib.parse as urlparse
import six.moves.urllib.request as urlrequest
registered_checks = {}
@six.add_metaclass(abc.ABCMeta)
class BaseCheck(object):
"""Abstract base class for Check classes."""
@abc.abstractmethod
def __str__(self):
"""String representation of the Check tree rooted at this node."""
pass
@abc.abstractmethod
def __call__(self, target, cred, enforcer):
"""Triggers if instance of the class is called.
Performs the check. Returns False to reject the access or a
true value (not necessary True) to accept the access.
"""
pass
class FalseCheck(BaseCheck):
"""A policy check that always returns ``False`` (disallow)."""
def __str__(self):
"""Return a string representation of this check."""
return "!"
def __call__(self, target, cred, enforcer):
"""Check the policy."""
return False
class TrueCheck(BaseCheck):
"""A policy check that always returns ``True`` (allow)."""
def __str__(self):
"""Return a string representation of this check."""
return "@"
def __call__(self, target, cred, enforcer):
"""Check the policy."""
return True
class Check(BaseCheck):
"""A base class to allow for user-defined policy checks.
:param kind: The kind of the check, i.e., the field before the ``:``.
:param match: The match of the check, i.e., the field after the ``:``.
"""
def __init__(self, kind, match):
self.kind = kind
self.match = match
def __str__(self):
"""Return a string representation of this check."""
return "%s:%s" % (self.kind, self.match)
class NotCheck(BaseCheck):
"""Implements the "not" logical operator.
A policy check that inverts the result of another policy check.
:param rule: The rule to negate. Must be a Check.
"""
def __init__(self, rule):
self.rule = rule
def __str__(self):
"""Return a string representation of this check."""
return "not %s" % self.rule
def __call__(self, target, cred, enforcer):
"""Check the policy.
Returns the logical inverse of the wrapped check.
"""
return not self.rule(target, cred, enforcer)
class AndCheck(BaseCheck):
"""Implements the "and" logical operator.
A policy check that requires that a list of other checks all return True.
:param list rules: rules that will be tested.
"""
def __init__(self, rules):
self.rules = rules
def __str__(self):
"""Return a string representation of this check."""
return "(%s)" % ' and '.join(str(r) for r in self.rules)
def __call__(self, target, cred, enforcer):
"""Check the policy.
Requires that all rules accept in order to return True.
"""
for rule in self.rules:
if not rule(target, cred, enforcer):
return False
return True
def add_check(self, rule):
"""Adds rule to be tested.
Allows addition of another rule to the list of rules that will
be tested.
:returns: self
:rtype: :class:`.AndCheck`
"""
self.rules.append(rule)
return self
class OrCheck(BaseCheck):
"""Implements the "or" operator.
A policy check that requires that at least one of a list of other
checks returns ``True``.
:param rules: A list of rules that will be tested.
"""
def __init__(self, rules):
self.rules = rules
def __str__(self):
"""Return a string representation of this check."""
return "(%s)" % ' or '.join(str(r) for r in self.rules)
def __call__(self, target, cred, enforcer):
"""Check the policy.
Requires that at least one rule accept in order to return True.
"""
for rule in self.rules:
if rule(target, cred, enforcer):
return True
return False
def add_check(self, rule):
"""Adds rule to be tested.
Allows addition of another rule to the list of rules that will
be tested. Returns the OrCheck object for convenience.
"""
self.rules.append(rule)
return self
def register(name, func=None):
"""Register a function or :class:`.Check` class as a policy check.
:param name: Gives the name of the check type, e.g., "rule",
"role", etc. If name is ``None``, a default check type
will be registered.
:param func: If given, provides the function or class to register.
If not given, returns a function taking one argument
to specify the function or class to register,
allowing use as a decorator.
"""
# Perform the actual decoration by registering the function or
# class. Returns the function or class for compliance with the
# decorator interface.
def decorator(func):
registered_checks[name] = func
return func
# If the function or class is given, do the registration
if func:
return decorator(func)
return decorator
@register("rule")
class RuleCheck(Check):
"""Recursively checks credentials based on the defined rules."""
def __call__(self, target, creds, enforcer):
try:
return enforcer.rules[self.match](target, creds, enforcer)
except KeyError:
# We don't have any matching rule; fail closed
return False
@register("role")
class RoleCheck(Check):
"""Check that there is a matching role in the ``creds`` dict."""
def __call__(self, target, creds, enforcer):
return self.match.lower() in [x.lower() for x in creds['roles']]
@register('http')
class HttpCheck(Check):
"""Check ``http:`` rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly ``True``.
"""
def __call__(self, target, creds, enforcer):
url = ('http:' + self.match) % target
# Convert instances of object() in target temporarily to
# empty dict to avoid circular reference detection
# errors in jsonutils.dumps().
temp_target = copy.deepcopy(target)
for key in target.keys():
element = target.get(key)
if type(element) is object:
temp_target[key] = {}
data = {'target': jsonutils.dumps(temp_target),
'credentials': jsonutils.dumps(creds)}
post_data = urlparse.urlencode(data)
f = urlrequest.urlopen(url, post_data)
return f.read() == "True"
@register(None)
class GenericCheck(Check):
"""Check an individual match.
Matches look like:
- tenant:%(tenant_id)s
- role:compute:admin
- True:%(user.enabled)s
- 'Member':%(role.name)s
"""
def __call__(self, target, creds, enforcer):
try:
match = self.match % target
except KeyError:
# While doing GenericCheck if key not
# present in Target return false
return False
try:
# Try to interpret self.kind as a literal
leftval = ast.literal_eval(self.kind)
except ValueError:
try:
kind_parts = self.kind.split('.')
leftval = creds
for kind_part in kind_parts:
leftval = leftval[kind_part]
except KeyError:
return False
return match == six.text_type(leftval)

341
oslo_policy/_parser.py Normal file
View File

@ -0,0 +1,341 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from oslo_log import log as logging
import six
from oslo_policy import _checks
from oslo_policy import _i18n
LOG = logging.getLogger(__name__)
_, _LE, _LI = _i18n._, _i18n._LE, _i18n._LI
def reducer(*tokens):
"""Decorator for reduction methods.
Arguments are a sequence of tokens, in order, which should trigger running
this reduction method.
"""
def decorator(func):
# Make sure we have a list of reducer sequences
if not hasattr(func, 'reducers'):
func.reducers = []
# Add the tokens to the list of reducer sequences
func.reducers.append(list(tokens))
return func
return decorator
class ParseStateMeta(type):
"""Metaclass for the :class:`.ParseState` class.
Facilitates identifying reduction methods.
"""
def __new__(mcs, name, bases, cls_dict):
"""Create the class.
Injects the 'reducers' list, a list of tuples matching token sequences
to the names of the corresponding reduction methods.
"""
reducers = []
for key, value in cls_dict.items():
if not hasattr(value, 'reducers'):
continue
for reduction in value.reducers:
reducers.append((reduction, key))
cls_dict['reducers'] = reducers
return super(ParseStateMeta, mcs).__new__(mcs, name, bases, cls_dict)
@six.add_metaclass(ParseStateMeta)
class ParseState(object):
"""Implement the core of parsing the policy language.
Uses a greedy reduction algorithm to reduce a sequence of tokens into
a single terminal, the value of which will be the root of the
:class:`Check` tree.
.. note::
Error reporting is rather lacking. The best we can get with this
parser formulation is an overall "parse failed" error. Fortunately, the
policy language is simple enough that this shouldn't be that big a
problem.
"""
def __init__(self):
"""Initialize the ParseState."""
self.tokens = []
self.values = []
def reduce(self):
"""Perform a greedy reduction of the token stream.
If a reducer method matches, it will be executed, then the
:meth:`reduce` method will be called recursively to search for any more
possible reductions.
"""
for reduction, methname in self.reducers:
if (len(self.tokens) >= len(reduction) and
self.tokens[-len(reduction):] == reduction):
# Get the reduction method
meth = getattr(self, methname)
# Reduce the token stream
results = meth(*self.values[-len(reduction):])
# Update the tokens and values
self.tokens[-len(reduction):] = [r[0] for r in results]
self.values[-len(reduction):] = [r[1] for r in results]
# Check for any more reductions
return self.reduce()
def shift(self, tok, value):
"""Adds one more token to the state.
Calls :meth:`reduce`.
"""
self.tokens.append(tok)
self.values.append(value)
# Do a greedy reduce...
self.reduce()
@property
def result(self):
"""Obtain the final result of the parse.
:raises ValueError: If the parse failed to reduce to a single result.
"""
if len(self.values) != 1:
raise ValueError("Could not parse rule")
return self.values[0]
@reducer('(', 'check', ')')
@reducer('(', 'and_expr', ')')
@reducer('(', 'or_expr', ')')
def _wrap_check(self, _p1, check, _p2):
"""Turn parenthesized expressions into a 'check' token."""
return [('check', check)]
@reducer('check', 'and', 'check')
def _make_and_expr(self, check1, _and, check2):
"""Create an 'and_expr'.
Join two checks by the 'and' operator.
"""
return [('and_expr', _checks.AndCheck([check1, check2]))]
@reducer('and_expr', 'and', 'check')
def _extend_and_expr(self, and_expr, _and, check):
"""Extend an 'and_expr' by adding one more check."""
return [('and_expr', and_expr.add_check(check))]
@reducer('check', 'or', 'check')
def _make_or_expr(self, check1, _or, check2):
"""Create an 'or_expr'.
Join two checks by the 'or' operator.
"""
return [('or_expr', _checks.OrCheck([check1, check2]))]
@reducer('or_expr', 'or', 'check')
def _extend_or_expr(self, or_expr, _or, check):
"""Extend an 'or_expr' by adding one more check."""
return [('or_expr', or_expr.add_check(check))]
@reducer('not', 'check')
def _make_not_expr(self, _not, check):
"""Invert the result of another check."""
return [('check', _checks.NotCheck(check))]
def _parse_check(rule):
"""Parse a single base check rule into an appropriate Check object."""
# Handle the special checks
if rule == '!':
return _checks.FalseCheck()
elif rule == '@':
return _checks.TrueCheck()
try:
kind, match = rule.split(':', 1)
except Exception:
LOG.exception(_LE("Failed to understand rule %s") % rule)
# If the rule is invalid, we'll fail closed
return _checks.FalseCheck()
# Find what implements the check
if kind in _checks.registered_checks:
return _checks.registered_checks[kind](kind, match)
elif None in _checks.registered_checks:
return _checks.registered_checks[None](kind, match)
else:
LOG.error(_LE("No handler for matches of kind %s") % kind)
return _checks.FalseCheck()
def _parse_list_rule(rule):
"""Translates the old list-of-lists syntax into a tree of Check objects.
Provided for backwards compatibility.
"""
# Empty rule defaults to True
if not rule:
return _checks.TrueCheck()
# Outer list is joined by "or"; inner list by "and"
or_list = []
for inner_rule in rule:
# Elide empty inner lists
if not inner_rule:
continue
# Handle bare strings
if isinstance(inner_rule, six.string_types):
inner_rule = [inner_rule]
# Parse the inner rules into Check objects
and_list = [_parse_check(r) for r in inner_rule]
# Append the appropriate check to the or_list
if len(and_list) == 1:
or_list.append(and_list[0])
else:
or_list.append(_checks.AndCheck(and_list))
# If we have only one check, omit the "or"
if not or_list:
return _checks.FalseCheck()
elif len(or_list) == 1:
return or_list[0]
return _checks.OrCheck(or_list)
# Used for tokenizing the policy language
_tokenize_re = re.compile(r'\s+')
def _parse_tokenize(rule):
"""Tokenizer for the policy language.
Most of the single-character tokens are specified in the
_tokenize_re; however, parentheses need to be handled specially,
because they can appear inside a check string. Thankfully, those
parentheses that appear inside a check string can never occur at
the very beginning or end ("%(variable)s" is the correct syntax).
"""
for tok in _tokenize_re.split(rule):
# Skip empty tokens
if not tok or tok.isspace():
continue
# Handle leading parens on the token
clean = tok.lstrip('(')
for i in range(len(tok) - len(clean)):
yield '(', '('
# If it was only parentheses, continue
if not clean:
continue
else:
tok = clean
# Handle trailing parens on the token
clean = tok.rstrip(')')
trail = len(tok) - len(clean)
# Yield the cleaned token
lowered = clean.lower()
if lowered in ('and', 'or', 'not'):
# Special tokens
yield lowered, clean
elif clean:
# Not a special token, but not composed solely of ')'
if len(tok) >= 2 and ((tok[0], tok[-1]) in
[('"', '"'), ("'", "'")]):
# It's a quoted string
yield 'string', tok[1:-1]
else:
yield 'check', _parse_check(clean)
# Yield the trailing parens
for i in range(trail):
yield ')', ')'
def _parse_text_rule(rule):
"""Parses policy to the tree.
Translates a policy written in the policy language into a tree of
Check objects.
"""
# Empty rule means always accept
if not rule:
return _checks.TrueCheck()
# Parse the token stream
state = ParseState()
for tok, value in _parse_tokenize(rule):
state.shift(tok, value)
try:
return state.result
except ValueError:
# Couldn't parse the rule
LOG.exception(_LE("Failed to understand rule %s") % rule)
# Fail closed
return _checks.FalseCheck()
def parse_rule(rule):
"""Parses a policy rule into a tree of :class:`.Check` objects."""
# If the rule is a string, it's in the policy language
if isinstance(rule, six.string_types):
return _parse_text_rule(rule)
return _parse_list_rule(rule)

View File

@ -202,22 +202,19 @@ by setting the ``policy_default_rule`` configuration setting to the
desired rule name.
"""
import abc
import ast
import copy
import os
import re
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
import six
import six.moves.urllib.parse as urlparse
import six.moves.urllib.request as urlrequest
from oslo_policy._i18n import _, _LE, _LI
from oslo_policy import _checks
from oslo_policy import _i18n
from oslo_policy import _parser
from oslo_policy.openstack.common import fileutils
_, _LI = _i18n._, _i18n._LI
_opts = [
cfg.StrOpt('policy_file',
@ -243,7 +240,9 @@ _opts = [
LOG = logging.getLogger(__name__)
_checks = {}
register = _checks.register
BaseCheck = _checks.BaseCheck
Check = _checks.Check
class PolicyNotAuthorized(Exception):
@ -262,7 +261,7 @@ class Rules(dict):
"""Allow loading of JSON rule data."""
# Suck in the JSON data and parse the rules
rules = dict((k, _parse_rule(v)) for k, v in
rules = dict((k, _parser.parse_rule(v)) for k, v in
jsonutils.loads(data).items())
return cls(rules, default_rule)
@ -284,7 +283,7 @@ class Rules(dict):
if not self.default_rule:
raise KeyError(key)
if isinstance(self.default_rule, BaseCheck):
if isinstance(self.default_rule, _checks.BaseCheck):
return self.default_rule
# We need to check this or we can get infinite recursion
@ -301,7 +300,7 @@ class Rules(dict):
out_rules = {}
for key, value in self.items():
# Use empty string for singleton TrueCheck instances
if isinstance(value, TrueCheck):
if isinstance(value, _checks.TrueCheck):
out_rules[key] = ''
else:
out_rules[key] = str(value)
@ -460,7 +459,7 @@ class Enforcer(object):
self.load_rules()
# Allow the rule to be a Check tree
if isinstance(rule, BaseCheck):
if isinstance(rule, _checks.BaseCheck):
result = rule(target, creds, self)
elif not self.rules:
# No rules to reference means we're going to fail closed
@ -482,599 +481,3 @@ class Enforcer(object):
raise PolicyNotAuthorized(rule)
return result
@six.add_metaclass(abc.ABCMeta)
class BaseCheck(object):
"""Abstract base class for Check classes."""
@abc.abstractmethod
def __str__(self):
"""String representation of the Check tree rooted at this node."""
pass
@abc.abstractmethod
def __call__(self, target, cred, enforcer):
"""Triggers if instance of the class is called.
Performs the check. Returns False to reject the access or a
true value (not necessary True) to accept the access.
"""
pass
class FalseCheck(BaseCheck):
"""A policy check that always returns ``False`` (disallow)."""
def __str__(self):
"""Return a string representation of this check."""
return "!"
def __call__(self, target, cred, enforcer):
"""Check the policy."""
return False
class TrueCheck(BaseCheck):
"""A policy check that always returns ``True`` (allow)."""
def __str__(self):
"""Return a string representation of this check."""
return "@"
def __call__(self, target, cred, enforcer):
"""Check the policy."""
return True
class Check(BaseCheck):
"""A base class to allow for user-defined policy checks.
:param kind: The kind of the check, i.e., the field before the ``:``.
:param match: The match of the check, i.e., the field after the ``:``.
"""
def __init__(self, kind, match):
self.kind = kind
self.match = match
def __str__(self):
"""Return a string representation of this check."""
return "%s:%s" % (self.kind, self.match)
class NotCheck(BaseCheck):
"""Implements the "not" logical operator.
A policy check that inverts the result of another policy check.
:param rule: The rule to negate. Must be a Check.
"""
def __init__(self, rule):
self.rule = rule
def __str__(self):
"""Return a string representation of this check."""
return "not %s" % self.rule
def __call__(self, target, cred, enforcer):
"""Check the policy.
Returns the logical inverse of the wrapped check.
"""
return not self.rule(target, cred, enforcer)
class AndCheck(BaseCheck):
"""Implements the "and" logical operator.
A policy check that requires that a list of other checks all return True.
:param list rules: rules that will be tested.
"""
def __init__(self, rules):
self.rules = rules
def __str__(self):
"""Return a string representation of this check."""
return "(%s)" % ' and '.join(str(r) for r in self.rules)
def __call__(self, target, cred, enforcer):
"""Check the policy.
Requires that all rules accept in order to return True.
"""
for rule in self.rules:
if not rule(target, cred, enforcer):
return False
return True
def add_check(self, rule):
"""Adds rule to be tested.
Allows addition of another rule to the list of rules that will
be tested.
:returns: self
:rtype: :class:`.AndCheck`
"""
self.rules.append(rule)
return self
class OrCheck(BaseCheck):
"""Implements the "or" operator.
A policy check that requires that at least one of a list of other
checks returns ``True``.
:param rules: A list of rules that will be tested.
"""
def __init__(self, rules):
self.rules = rules
def __str__(self):
"""Return a string representation of this check."""
return "(%s)" % ' or '.join(str(r) for r in self.rules)
def __call__(self, target, cred, enforcer):
"""Check the policy.
Requires that at least one rule accept in order to return True.
"""
for rule in self.rules:
if rule(target, cred, enforcer):
return True
return False
def add_check(self, rule):
"""Adds rule to be tested.
Allows addition of another rule to the list of rules that will
be tested. Returns the OrCheck object for convenience.
"""
self.rules.append(rule)
return self
def _parse_check(rule):
"""Parse a single base check rule into an appropriate Check object."""
# Handle the special checks
if rule == '!':
return FalseCheck()
elif rule == '@':
return TrueCheck()
try:
kind, match = rule.split(':', 1)
except Exception:
LOG.exception(_LE("Failed to understand rule %s") % rule)
# If the rule is invalid, we'll fail closed
return FalseCheck()
# Find what implements the check
if kind in _checks:
return _checks[kind](kind, match)
elif None in _checks:
return _checks[None](kind, match)
else:
LOG.error(_LE("No handler for matches of kind %s") % kind)
return FalseCheck()
def _parse_list_rule(rule):
"""Translates the old list-of-lists syntax into a tree of Check objects.
Provided for backwards compatibility.
"""
# Empty rule defaults to True
if not rule:
return TrueCheck()
# Outer list is joined by "or"; inner list by "and"
or_list = []
for inner_rule in rule:
# Elide empty inner lists
if not inner_rule:
continue
# Handle bare strings
if isinstance(inner_rule, six.string_types):
inner_rule = [inner_rule]
# Parse the inner rules into Check objects
and_list = [_parse_check(r) for r in inner_rule]
# Append the appropriate check to the or_list
if len(and_list) == 1:
or_list.append(and_list[0])
else:
or_list.append(AndCheck(and_list))
# If we have only one check, omit the "or"
if not or_list:
return FalseCheck()
elif len(or_list) == 1:
return or_list[0]
return OrCheck(or_list)
# Used for tokenizing the policy language
_tokenize_re = re.compile(r'\s+')
def _parse_tokenize(rule):
"""Tokenizer for the policy language.
Most of the single-character tokens are specified in the
_tokenize_re; however, parentheses need to be handled specially,
because they can appear inside a check string. Thankfully, those
parentheses that appear inside a check string can never occur at
the very beginning or end ("%(variable)s" is the correct syntax).
"""
for tok in _tokenize_re.split(rule):
# Skip empty tokens
if not tok or tok.isspace():
continue
# Handle leading parens on the token
clean = tok.lstrip('(')
for i in range(len(tok) - len(clean)):
yield '(', '('
# If it was only parentheses, continue
if not clean:
continue
else:
tok = clean
# Handle trailing parens on the token
clean = tok.rstrip(')')
trail = len(tok) - len(clean)
# Yield the cleaned token
lowered = clean.lower()
if lowered in ('and', 'or', 'not'):
# Special tokens
yield lowered, clean
elif clean:
# Not a special token, but not composed solely of ')'
if len(tok) >= 2 and ((tok[0], tok[-1]) in
[('"', '"'), ("'", "'")]):
# It's a quoted string
yield 'string', tok[1:-1]
else:
yield 'check', _parse_check(clean)
# Yield the trailing parens
for i in range(trail):
yield ')', ')'
class _ParseStateMeta(type):
"""Metaclass for the :class:`.ParseState` class.
Facilitates identifying reduction methods.
"""
def __new__(mcs, name, bases, cls_dict):
"""Create the class.
Injects the 'reducers' list, a list of tuples matching token sequences
to the names of the corresponding reduction methods.
"""
reducers = []
for key, value in cls_dict.items():
if not hasattr(value, 'reducers'):
continue
for reduction in value.reducers:
reducers.append((reduction, key))
cls_dict['reducers'] = reducers
return super(_ParseStateMeta, mcs).__new__(mcs, name, bases, cls_dict)
def reducer(*tokens):
"""Decorator for reduction methods.
Arguments are a sequence of tokens, in order, which should trigger running
this reduction method.
"""
def decorator(func):
# Make sure we have a list of reducer sequences
if not hasattr(func, 'reducers'):
func.reducers = []
# Add the tokens to the list of reducer sequences
func.reducers.append(list(tokens))
return func
return decorator
@six.add_metaclass(_ParseStateMeta)
class _ParseState(object):
"""Implement the core of parsing the policy language.
Uses a greedy reduction algorithm to reduce a sequence of tokens into
a single terminal, the value of which will be the root of the
:class:`Check` tree.
.. note::
Error reporting is rather lacking. The best we can get with this
parser formulation is an overall "parse failed" error. Fortunately, the
policy language is simple enough that this shouldn't be that big a
problem.
"""
def __init__(self):
"""Initialize the _ParseState."""
self.tokens = []
self.values = []
def reduce(self):
"""Perform a greedy reduction of the token stream.
If a reducer method matches, it will be executed, then the
:meth:`reduce` method will be called recursively to search for any more
possible reductions.
"""
for reduction, methname in self.reducers:
if (len(self.tokens) >= len(reduction) and
self.tokens[-len(reduction):] == reduction):
# Get the reduction method
meth = getattr(self, methname)
# Reduce the token stream
results = meth(*self.values[-len(reduction):])
# Update the tokens and values
self.tokens[-len(reduction):] = [r[0] for r in results]
self.values[-len(reduction):] = [r[1] for r in results]
# Check for any more reductions
return self.reduce()
def shift(self, tok, value):
"""Adds one more token to the state.
Calls :meth:`reduce`.
"""
self.tokens.append(tok)
self.values.append(value)
# Do a greedy reduce...
self.reduce()
@property
def result(self):
"""Obtain the final result of the parse.
:raises ValueError: If the parse failed to reduce to a single result.
"""
if len(self.values) != 1:
raise ValueError("Could not parse rule")
return self.values[0]
@reducer('(', 'check', ')')
@reducer('(', 'and_expr', ')')
@reducer('(', 'or_expr', ')')
def _wrap_check(self, _p1, check, _p2):
"""Turn parenthesized expressions into a 'check' token."""
return [('check', check)]
@reducer('check', 'and', 'check')
def _make_and_expr(self, check1, _and, check2):
"""Create an 'and_expr'.
Join two checks by the 'and' operator.
"""
return [('and_expr', AndCheck([check1, check2]))]
@reducer('and_expr', 'and', 'check')
def _extend_and_expr(self, and_expr, _and, check):
"""Extend an 'and_expr' by adding one more check."""
return [('and_expr', and_expr.add_check(check))]
@reducer('check', 'or', 'check')
def _make_or_expr(self, check1, _or, check2):
"""Create an 'or_expr'.
Join two checks by the 'or' operator.
"""
return [('or_expr', OrCheck([check1, check2]))]
@reducer('or_expr', 'or', 'check')
def _extend_or_expr(self, or_expr, _or, check):
"""Extend an 'or_expr' by adding one more check."""
return [('or_expr', or_expr.add_check(check))]
@reducer('not', 'check')
def _make_not_expr(self, _not, check):
"""Invert the result of another check."""
return [('check', NotCheck(check))]
def _parse_text_rule(rule):
"""Parses policy to the tree.
Translates a policy written in the policy language into a tree of
Check objects.
"""
# Empty rule means always accept
if not rule:
return TrueCheck()
# Parse the token stream
state = _ParseState()
for tok, value in _parse_tokenize(rule):
state.shift(tok, value)
try:
return state.result
except ValueError:
# Couldn't parse the rule
LOG.exception(_LE("Failed to understand rule %s") % rule)
# Fail closed
return FalseCheck()
def _parse_rule(rule):
"""Parses a policy rule into a tree of :class:`.Check` objects."""
# If the rule is a string, it's in the policy language
if isinstance(rule, six.string_types):
return _parse_text_rule(rule)
return _parse_list_rule(rule)
def register(name, func=None):
"""Register a function or :class:`.Check` class as a policy check.
:param name: Gives the name of the check type, e.g., "rule",
"role", etc. If name is ``None``, a default check type
will be registered.
:param func: If given, provides the function or class to register.
If not given, returns a function taking one argument
to specify the function or class to register,
allowing use as a decorator.
"""
# Perform the actual decoration by registering the function or
# class. Returns the function or class for compliance with the
# decorator interface.
def decorator(func):
_checks[name] = func
return func
# If the function or class is given, do the registration
if func:
return decorator(func)
return decorator
@register("rule")
class RuleCheck(Check):
"""Recursively checks credentials based on the defined rules."""
def __call__(self, target, creds, enforcer):
try:
return enforcer.rules[self.match](target, creds, enforcer)
except KeyError:
# We don't have any matching rule; fail closed
return False
@register("role")
class RoleCheck(Check):
"""Check that there is a matching role in the ``creds`` dict."""
def __call__(self, target, creds, enforcer):
return self.match.lower() in [x.lower() for x in creds['roles']]
@register('http')
class HttpCheck(Check):
"""Check ``http:`` rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly ``True``.
"""
def __call__(self, target, creds, enforcer):
url = ('http:' + self.match) % target
# Convert instances of object() in target temporarily to
# empty dict to avoid circular reference detection
# errors in jsonutils.dumps().
temp_target = copy.deepcopy(target)
for key in target.keys():
element = target.get(key)
if type(element) is object:
temp_target[key] = {}
data = {'target': jsonutils.dumps(temp_target),
'credentials': jsonutils.dumps(creds)}
post_data = urlparse.urlencode(data)
f = urlrequest.urlopen(url, post_data)
return f.read() == "True"
@register(None)
class GenericCheck(Check):
"""Check an individual match.
Matches look like:
- tenant:%(tenant_id)s
- role:compute:admin
- True:%(user.enabled)s
- 'Member':%(role.name)s
"""
def __call__(self, target, creds, enforcer):
try:
match = self.match % target
except KeyError:
# While doing GenericCheck if key not
# present in Target return false
return False
try:
# Try to interpret self.kind as a literal
leftval = ast.literal_eval(self.kind)
except ValueError:
try:
kind_parts = self.kind.split('.')
leftval = creds
for kind_part in kind_parts:
leftval = leftval[kind_part]
except KeyError:
return False
return match == six.text_type(leftval)

52
oslo_policy/tests/base.py Normal file
View File

@ -0,0 +1,52 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os.path
from oslo_concurrency.fixture import lockutils
from oslo_config import cfg
from oslo_config import fixture as config
from oslotest import base as test_base
from oslo_policy import policy
TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
'..', 'tests/var'))
ENFORCER = policy.Enforcer(cfg.CONF)
class PolicyBaseTestCase(test_base.BaseTestCase):
def setUp(self):
super(PolicyBaseTestCase, self).setUp()
# NOTE(bnemec): Many of these tests use the same ENFORCER object, so
# I believe we need to serialize them.
self.useFixture(lockutils.LockFixture('policy-lock'))
self.CONF = self.useFixture(config.Config()).conf
self.CONF(args=['--config-dir', TEST_VAR_DIR])
self.enforcer = ENFORCER
self.addCleanup(self.enforcer.clear)
class FakeCheck(policy.BaseCheck):
def __init__(self, result=None):
self.result = result
def __str__(self):
return str(self.result)
def __call__(self, target, creds, enforcer):
if self.result is not None:
return self.result
return (target, creds, enforcer)

View File

@ -0,0 +1,402 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_serialization import jsonutils
from oslotest import base as test_base
import six
import six.moves.urllib.parse as urlparse
import six.moves.urllib.request as urlrequest
from oslo_policy import _checks
from oslo_policy import policy
from oslo_policy.tests import base
ENFORCER = base.ENFORCER
class CheckRegisterTestCase(test_base.BaseTestCase):
@mock.patch.object(_checks, 'registered_checks', {})
def test_register_check(self):
class TestCheck(_checks.Check):
pass
policy.register('spam', TestCheck)
self.assertEqual(_checks.registered_checks, dict(spam=TestCheck))
@mock.patch.object(_checks, 'registered_checks', {})
def test_register_check_decorator(self):
@policy.register('spam')
class TestCheck(_checks.Check):
pass
self.assertEqual(_checks.registered_checks, dict(spam=TestCheck))
class RuleCheckTestCase(test_base.BaseTestCase):
@mock.patch.object(ENFORCER, 'rules', {})
def test_rule_missing(self):
check = _checks.RuleCheck('rule', 'spam')
self.assertEqual(check('target', 'creds', ENFORCER), False)
@mock.patch.object(ENFORCER, 'rules',
dict(spam=mock.Mock(return_value=False)))
def test_rule_false(self):
enforcer = ENFORCER
check = _checks.RuleCheck('rule', 'spam')
self.assertEqual(check('target', 'creds', enforcer), False)
enforcer.rules['spam'].assert_called_once_with('target', 'creds',
enforcer)
@mock.patch.object(ENFORCER, 'rules',
dict(spam=mock.Mock(return_value=True)))
def test_rule_true(self):
enforcer = ENFORCER
check = _checks.RuleCheck('rule', 'spam')
self.assertEqual(check('target', 'creds', enforcer), True)
enforcer.rules['spam'].assert_called_once_with('target', 'creds',
enforcer)
class RoleCheckTestCase(base.PolicyBaseTestCase):
def test_accept(self):
check = _checks.RoleCheck('role', 'sPaM')
self.assertEqual(check('target', dict(roles=['SpAm']),
self.enforcer), True)
def test_reject(self):
check = _checks.RoleCheck('role', 'spam')
self.assertEqual(check('target', dict(roles=[]), self.enforcer), False)
class HttpCheckTestCase(base.PolicyBaseTestCase):
def decode_post_data(self, post_data):
result = {}
for item in post_data.split('&'):
key, _sep, value = item.partition('=')
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
return result
@mock.patch.object(urlrequest, 'urlopen',
return_value=six.StringIO('True'))
def test_accept(self, mock_urlopen):
check = _checks.HttpCheck('http', '//example.com/%(name)s')
self.assertEqual(check(dict(name='target', spam='spammer'),
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer),
True)
self.assertEqual(mock_urlopen.call_count, 1)
args = mock_urlopen.call_args[0]
self.assertEqual(args[0], 'http://example.com/target')
self.assertEqual(self.decode_post_data(args[1]), dict(
target=dict(name='target', spam='spammer'),
credentials=dict(user='user', roles=['a', 'b', 'c']),
))
@mock.patch.object(urlrequest, 'urlopen',
return_value=six.StringIO('other'))
def test_reject(self, mock_urlopen):
check = _checks.HttpCheck('http', '//example.com/%(name)s')
self.assertEqual(check(dict(name='target', spam='spammer'),
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer),
False)
self.assertEqual(mock_urlopen.call_count, 1)
args = mock_urlopen.call_args[0]
self.assertEqual(args[0], 'http://example.com/target')
self.assertEqual(self.decode_post_data(args[1]), dict(
target=dict(name='target', spam='spammer'),
credentials=dict(user='user', roles=['a', 'b', 'c']),
))
@mock.patch.object(urlrequest, 'urlopen',
return_value=six.StringIO('True'))
def test_http_with_objects_in_target(self, mock_urlopen):
check = _checks.HttpCheck('http', '//example.com/%(name)s')
target = {'a': object(),
'name': 'target',
'b': 'test data'}
self.assertEqual(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer),
True)
@mock.patch.object(urlrequest, 'urlopen',
return_value=six.StringIO('True'))
def test_http_with_strings_in_target(self, mock_urlopen):
check = _checks.HttpCheck('http', '//example.com/%(name)s')
target = {'a': 'some_string',
'name': 'target',
'b': 'test data'}
self.assertEqual(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer),
True)
class GenericCheckTestCase(base.PolicyBaseTestCase):
def test_no_cred(self):
check = _checks.GenericCheck('name', '%(name)s')
self.assertEqual(check(dict(name='spam'), {}, self.enforcer), False)
def test_cred_mismatch(self):
check = _checks.GenericCheck('name', '%(name)s')
self.assertEqual(check(dict(name='spam'),
dict(name='ham'),
self.enforcer), False)
def test_accept(self):
check = _checks.GenericCheck('name', '%(name)s')
self.assertEqual(check(dict(name='spam'),
dict(name='spam'),
self.enforcer), True)
def test_no_key_match_in_target(self):
check = _checks.GenericCheck('name', '%(name)s')
self.assertEqual(check(dict(name1='spam'),
dict(name='spam'),
self.enforcer), False)
def test_constant_string_mismatch(self):
check = _checks.GenericCheck("'spam'", '%(name)s')
self.assertEqual(check(dict(name='ham'),
{},
self.enforcer), False)
def test_constant_string_accept(self):
check = _checks.GenericCheck("'spam'", '%(name)s')
self.assertEqual(check(dict(name='spam'),
{},
self.enforcer), True)
def test_constant_literal_mismatch(self):
check = _checks.GenericCheck("True", '%(enabled)s')
self.assertEqual(check(dict(enabled=False),
{},
self.enforcer), False)
def test_constant_literal_accept(self):
check = _checks.GenericCheck("True", '%(enabled)s')
self.assertEqual(check(dict(enabled=True),
{},
self.enforcer), True)
def test_deep_credentials_dictionary_lookup(self):
check = _checks.GenericCheck("a.b.c.d", 'APPLES')
credentials = {'a': {'b': {'c': {'d': 'APPLES'}}}}
self.assertEqual(check({},
credentials,
self.enforcer), True)
def test_missing_credentials_dictionary_lookup(self):
credentials = {'a': 'APPLES', 'o': {'t': 'ORANGES'}}
# First a valid check - rest of case is expecting failures
# Should prove the basic credentials structure before we test
# for failure cases.
check = _checks.GenericCheck("o.t", 'ORANGES')
self.assertEqual(check({},
credentials,
self.enforcer), True)
# Case where final key is missing
check = _checks.GenericCheck("o.v", 'ORANGES')
self.assertEqual(check({},
credentials,
self.enforcer), False)
# Attempt to access key under a missing dictionary
check = _checks.GenericCheck("q.v", 'APPLES')
self.assertEqual(check({},
credentials,
self.enforcer), False)
class FalseCheckTestCase(test_base.BaseTestCase):
def test_str(self):
check = _checks.FalseCheck()
self.assertEqual(str(check), '!')
def test_call(self):
check = _checks.FalseCheck()
self.assertEqual(check('target', 'creds', None), False)
class TrueCheckTestCase(test_base.BaseTestCase):
def test_str(self):
check = _checks.TrueCheck()
self.assertEqual(str(check), '@')
def test_call(self):
check = _checks.TrueCheck()
self.assertEqual(check('target', 'creds', None), True)
class CheckForTest(_checks.Check):
def __call__(self, target, creds, enforcer):
pass
class CheckTestCase(test_base.BaseTestCase):
def test_init(self):
check = CheckForTest('kind', 'match')
self.assertEqual(check.kind, 'kind')
self.assertEqual(check.match, 'match')
def test_str(self):
check = CheckForTest('kind', 'match')
self.assertEqual(str(check), 'kind:match')
class NotCheckTestCase(test_base.BaseTestCase):
def test_init(self):
check = _checks.NotCheck('rule')
self.assertEqual(check.rule, 'rule')
def test_str(self):
check = _checks.NotCheck('rule')
self.assertEqual(str(check), 'not rule')
def test_call_true(self):
rule = mock.Mock(return_value=True)
check = _checks.NotCheck(rule)
self.assertEqual(check('target', 'cred', None), False)
rule.assert_called_once_with('target', 'cred', None)
def test_call_false(self):
rule = mock.Mock(return_value=False)
check = _checks.NotCheck(rule)
self.assertEqual(check('target', 'cred', None), True)
rule.assert_called_once_with('target', 'cred', None)
class AndCheckTestCase(test_base.BaseTestCase):
def test_init(self):
check = _checks.AndCheck(['rule1', 'rule2'])
self.assertEqual(check.rules, ['rule1', 'rule2'])
def test_add_check(self):
check = _checks.AndCheck(['rule1', 'rule2'])
check.add_check('rule3')
self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3'])
def test_str(self):
check = _checks.AndCheck(['rule1', 'rule2'])
self.assertEqual(str(check), '(rule1 and rule2)')
def test_call_all_false(self):
rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)]
check = _checks.AndCheck(rules)
self.assertEqual(check('target', 'cred', None), False)
rules[0].assert_called_once_with('target', 'cred', None)
self.assertFalse(rules[1].called)
def test_call_first_true(self):
rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)]
check = _checks.AndCheck(rules)
self.assertFalse(check('target', 'cred', None))
rules[0].assert_called_once_with('target', 'cred', None)
rules[1].assert_called_once_with('target', 'cred', None)
def test_call_second_true(self):
rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)]
check = _checks.AndCheck(rules)
self.assertFalse(check('target', 'cred', None))
rules[0].assert_called_once_with('target', 'cred', None)
self.assertFalse(rules[1].called)
class OrCheckTestCase(test_base.BaseTestCase):
def test_init(self):
check = _checks.OrCheck(['rule1', 'rule2'])
self.assertEqual(check.rules, ['rule1', 'rule2'])
def test_add_check(self):
check = _checks.OrCheck(['rule1', 'rule2'])
check.add_check('rule3')
self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3'])
def test_str(self):
check = _checks.OrCheck(['rule1', 'rule2'])
self.assertEqual(str(check), '(rule1 or rule2)')
def test_call_all_false(self):
rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)]
check = _checks.OrCheck(rules)
self.assertEqual(check('target', 'cred', None), False)
rules[0].assert_called_once_with('target', 'cred', None)
rules[1].assert_called_once_with('target', 'cred', None)
def test_call_first_true(self):
rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)]
check = _checks.OrCheck(rules)
self.assertEqual(check('target', 'cred', None), True)
rules[0].assert_called_once_with('target', 'cred', None)
self.assertFalse(rules[1].called)
def test_call_second_true(self):
rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)]
check = _checks.OrCheck(rules)
self.assertEqual(check('target', 'cred', None), True)
rules[0].assert_called_once_with('target', 'cred', None)
rules[1].assert_called_once_with('target', 'cred', None)

View File

@ -0,0 +1,403 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslotest import base as test_base
import six
from oslo_policy import _checks
from oslo_policy import _parser
from oslo_policy.tests import base
class ParseCheckTestCase(test_base.BaseTestCase):
def test_false(self):
result = _parser._parse_check('!')
self.assertTrue(isinstance(result, _checks.FalseCheck))
def test_true(self):
result = _parser._parse_check('@')
self.assertTrue(isinstance(result, _checks.TrueCheck))
def test_bad_rule(self):
result = _parser._parse_check('foobar')
self.assertTrue(isinstance(result, _checks.FalseCheck))
@mock.patch.object(_checks, 'registered_checks', {})
def test_no_handler(self):
result = _parser._parse_check('no:handler')
self.assertTrue(isinstance(result, _checks.FalseCheck))
@mock.patch.object(_checks, 'registered_checks', {
'spam': mock.Mock(return_value="spam_check"),
None: mock.Mock(return_value="none_check"),
})
def test_check(self):
result = _parser._parse_check('spam:handler')
self.assertEqual(result, 'spam_check')
_checks.registered_checks['spam'].assert_called_once_with('spam',
'handler')
self.assertFalse(_checks.registered_checks[None].called)
@mock.patch.object(_checks, 'registered_checks', {
None: mock.Mock(return_value="none_check"),
})
def test_check_default(self):
result = _parser._parse_check('spam:handler')
self.assertEqual(result, 'none_check')
_checks.registered_checks[None].assert_called_once_with('spam',
'handler')
class ParseListRuleTestCase(test_base.BaseTestCase):
def test_empty(self):
result = _parser._parse_list_rule([])
self.assertTrue(isinstance(result, _checks.TrueCheck))
self.assertEqual(str(result), '@')
@mock.patch.object(_parser, '_parse_check', base.FakeCheck)
def test_oneele_zeroele(self):
result = _parser._parse_list_rule([[]])
self.assertTrue(isinstance(result, _checks.FalseCheck))
self.assertEqual(str(result), '!')
@mock.patch.object(_parser, '_parse_check', base.FakeCheck)
def test_oneele_bare(self):
result = _parser._parse_list_rule(['rule'])
self.assertTrue(isinstance(result, base.FakeCheck))
self.assertEqual(result.result, 'rule')
self.assertEqual(str(result), 'rule')
@mock.patch.object(_parser, '_parse_check', base.FakeCheck)
def test_oneele_oneele(self):
result = _parser._parse_list_rule([['rule']])
self.assertTrue(isinstance(result, base.FakeCheck))
self.assertEqual(result.result, 'rule')
self.assertEqual(str(result), 'rule')
@mock.patch.object(_parser, '_parse_check', base.FakeCheck)
def test_oneele_multi(self):
result = _parser._parse_list_rule([['rule1', 'rule2']])
self.assertTrue(isinstance(result, _checks.AndCheck))
self.assertEqual(len(result.rules), 2)
for i, value in enumerate(['rule1', 'rule2']):
self.assertTrue(isinstance(result.rules[i], base.FakeCheck))
self.assertEqual(result.rules[i].result, value)
self.assertEqual(str(result), '(rule1 and rule2)')
@mock.patch.object(_parser, '_parse_check', base.FakeCheck)
def test_multi_oneele(self):
result = _parser._parse_list_rule([['rule1'], ['rule2']])
self.assertTrue(isinstance(result, _checks.OrCheck))
self.assertEqual(len(result.rules), 2)
for i, value in enumerate(['rule1', 'rule2']):
self.assertTrue(isinstance(result.rules[i], base.FakeCheck))
self.assertEqual(result.rules[i].result, value)
self.assertEqual(str(result), '(rule1 or rule2)')
@mock.patch.object(_parser, '_parse_check', base.FakeCheck)
def test_multi_multi(self):
result = _parser._parse_list_rule([['rule1', 'rule2'],
['rule3', 'rule4']])
self.assertTrue(isinstance(result, _checks.OrCheck))
self.assertEqual(len(result.rules), 2)
for i, values in enumerate([['rule1', 'rule2'], ['rule3', 'rule4']]):
self.assertTrue(isinstance(result.rules[i], _checks.AndCheck))
self.assertEqual(len(result.rules[i].rules), 2)
for j, value in enumerate(values):
self.assertTrue(isinstance(result.rules[i].rules[j],
base.FakeCheck))
self.assertEqual(result.rules[i].rules[j].result, value)
self.assertEqual(str(result),
'((rule1 and rule2) or (rule3 and rule4))')
class ParseTokenizeTestCase(test_base.BaseTestCase):
@mock.patch.object(_parser, '_parse_check', lambda x: x)
def test_tokenize(self):
exemplar = ("(( ( ((() And)) or ) (check:%(miss)s) not)) "
"'a-string' \"another-string\"")
expected = [
('(', '('), ('(', '('), ('(', '('), ('(', '('), ('(', '('),
('(', '('), (')', ')'), ('and', 'And'),
(')', ')'), (')', ')'), ('or', 'or'), (')', ')'), ('(', '('),
('check', 'check:%(miss)s'), (')', ')'), ('not', 'not'),
(')', ')'), (')', ')'),
('string', 'a-string'),
('string', 'another-string'),
]
result = list(_parser._parse_tokenize(exemplar))
self.assertEqual(result, expected)
class ParseStateMetaTestCase(test_base.BaseTestCase):
def test_reducer(self):
@_parser.reducer('a', 'b', 'c')
@_parser.reducer('d', 'e', 'f')
def spam():
pass
self.assertTrue(hasattr(spam, 'reducers'))
self.assertEqual(spam.reducers, [['d', 'e', 'f'], ['a', 'b', 'c']])
def test_parse_state_meta(self):
@six.add_metaclass(_parser.ParseStateMeta)
class FakeState(object):
@_parser.reducer('a', 'b', 'c')
@_parser.reducer('d', 'e', 'f')
def reduce1(self):
pass
@_parser.reducer('g', 'h', 'i')
def reduce2(self):
pass
self.assertTrue(hasattr(FakeState, 'reducers'))
for reduction, reducer in FakeState.reducers:
if (reduction == ['a', 'b', 'c'] or
reduction == ['d', 'e', 'f']):
self.assertEqual(reducer, 'reduce1')
elif reduction == ['g', 'h', 'i']:
self.assertEqual(reducer, 'reduce2')
else:
self.fail("Unrecognized reducer discovered")
class ParseStateTestCase(test_base.BaseTestCase):
def test_init(self):
state = _parser.ParseState()
self.assertEqual(state.tokens, [])
self.assertEqual(state.values, [])
@mock.patch.object(_parser.ParseState, 'reducers', [(['tok1'], 'meth')])
@mock.patch.object(_parser.ParseState, 'meth', create=True)
def test_reduce_none(self, mock_meth):
state = _parser.ParseState()
state.tokens = ['tok2']
state.values = ['val2']
state.reduce()
self.assertEqual(state.tokens, ['tok2'])
self.assertEqual(state.values, ['val2'])
self.assertFalse(mock_meth.called)
@mock.patch.object(_parser.ParseState, 'reducers',
[(['tok1', 'tok2'], 'meth')])
@mock.patch.object(_parser.ParseState, 'meth', create=True)
def test_reduce_short(self, mock_meth):
state = _parser.ParseState()
state.tokens = ['tok1']
state.values = ['val1']
state.reduce()
self.assertEqual(state.tokens, ['tok1'])
self.assertEqual(state.values, ['val1'])
self.assertFalse(mock_meth.called)
@mock.patch.object(_parser.ParseState, 'reducers',
[(['tok1', 'tok2'], 'meth')])
@mock.patch.object(_parser.ParseState, 'meth', create=True,
return_value=[('tok3', 'val3')])
def test_reduce_one(self, mock_meth):
state = _parser.ParseState()
state.tokens = ['tok1', 'tok2']
state.values = ['val1', 'val2']
state.reduce()
self.assertEqual(state.tokens, ['tok3'])
self.assertEqual(state.values, ['val3'])
mock_meth.assert_called_once_with('val1', 'val2')
@mock.patch.object(_parser.ParseState, 'reducers', [
(['tok1', 'tok4'], 'meth2'),
(['tok2', 'tok3'], 'meth1'),
])
@mock.patch.object(_parser.ParseState, 'meth1', create=True,
return_value=[('tok4', 'val4')])
@mock.patch.object(_parser.ParseState, 'meth2', create=True,
return_value=[('tok5', 'val5')])
def test_reduce_two(self, mock_meth2, mock_meth1):
state = _parser.ParseState()
state.tokens = ['tok1', 'tok2', 'tok3']
state.values = ['val1', 'val2', 'val3']
state.reduce()
self.assertEqual(state.tokens, ['tok5'])
self.assertEqual(state.values, ['val5'])
mock_meth1.assert_called_once_with('val2', 'val3')
mock_meth2.assert_called_once_with('val1', 'val4')
@mock.patch.object(_parser.ParseState, 'reducers',
[(['tok1', 'tok2'], 'meth')])
@mock.patch.object(_parser.ParseState, 'meth', create=True,
return_value=[('tok3', 'val3'), ('tok4', 'val4')])
def test_reduce_multi(self, mock_meth):
state = _parser.ParseState()
state.tokens = ['tok1', 'tok2']
state.values = ['val1', 'val2']
state.reduce()
self.assertEqual(state.tokens, ['tok3', 'tok4'])
self.assertEqual(state.values, ['val3', 'val4'])
mock_meth.assert_called_once_with('val1', 'val2')
def test_shift(self):
state = _parser.ParseState()
with mock.patch.object(_parser.ParseState, 'reduce') as mock_reduce:
state.shift('token', 'value')
self.assertEqual(state.tokens, ['token'])
self.assertEqual(state.values, ['value'])
mock_reduce.assert_called_once_with()
def test_result_empty(self):
state = _parser.ParseState()
self.assertRaises(ValueError, lambda: state.result)
def test_result_unreduced(self):
state = _parser.ParseState()
state.tokens = ['tok1', 'tok2']
state.values = ['val1', 'val2']
self.assertRaises(ValueError, lambda: state.result)
def test_result(self):
state = _parser.ParseState()
state.tokens = ['token']
state.values = ['value']
self.assertEqual(state.result, 'value')
def test_wrap_check(self):
state = _parser.ParseState()
result = state._wrap_check('(', 'the_check', ')')
self.assertEqual(result, [('check', 'the_check')])
@mock.patch.object(_checks, 'AndCheck', lambda x: x)
def test_make_and_expr(self):
state = _parser.ParseState()
result = state._make_and_expr('check1', 'and', 'check2')
self.assertEqual(result, [('and_expr', ['check1', 'check2'])])
def test_extend_and_expr(self):
state = _parser.ParseState()
mock_expr = mock.Mock()
mock_expr.add_check.return_value = 'newcheck'
result = state._extend_and_expr(mock_expr, 'and', 'check')
self.assertEqual(result, [('and_expr', 'newcheck')])
mock_expr.add_check.assert_called_once_with('check')
@mock.patch.object(_checks, 'OrCheck', lambda x: x)
def test_make_or_expr(self):
state = _parser.ParseState()
result = state._make_or_expr('check1', 'or', 'check2')
self.assertEqual(result, [('or_expr', ['check1', 'check2'])])
def test_extend_or_expr(self):
state = _parser.ParseState()
mock_expr = mock.Mock()
mock_expr.add_check.return_value = 'newcheck'
result = state._extend_or_expr(mock_expr, 'or', 'check')
self.assertEqual(result, [('or_expr', 'newcheck')])
mock_expr.add_check.assert_called_once_with('check')
@mock.patch.object(_checks, 'NotCheck', lambda x: 'not %s' % x)
def test_make_not_expr(self):
state = _parser.ParseState()
result = state._make_not_expr('not', 'check')
self.assertEqual(result, [('check', 'not check')])
class ParseTextRuleTestCase(test_base.BaseTestCase):
def test_empty(self):
result = _parser._parse_text_rule('')
self.assertTrue(isinstance(result, _checks.TrueCheck))
@mock.patch.object(_parser, '_parse_tokenize',
return_value=[('tok1', 'val1'), ('tok2', 'val2')])
@mock.patch.object(_parser.ParseState, 'shift')
@mock.patch.object(_parser.ParseState, 'result', 'result')
def test_shifts(self, mock_shift, mock_parse_tokenize):
result = _parser._parse_text_rule('test rule')
self.assertEqual(result, 'result')
mock_parse_tokenize.assert_called_once_with('test rule')
mock_shift.assert_has_calls(
[mock.call('tok1', 'val1'), mock.call('tok2', 'val2')])
@mock.patch.object(_parser, '_parse_tokenize', return_value=[])
def test_fail(self, mock_parse_tokenize):
result = _parser._parse_text_rule('test rule')
self.assertTrue(isinstance(result, _checks.FalseCheck))
mock_parse_tokenize.assert_called_once_with('test rule')
class ParseRuleTestCase(test_base.BaseTestCase):
@mock.patch.object(_parser, '_parse_text_rule', return_value='text rule')
@mock.patch.object(_parser, '_parse_list_rule', return_value='list rule')
def test_parse_rule_string(self, mock_parse_list_rule,
mock_parse_text_rule):
result = _parser.parse_rule("a string")
self.assertEqual(result, 'text rule')
self.assertFalse(mock_parse_list_rule.called)
mock_parse_text_rule.assert_called_once_with('a string')
@mock.patch.object(_parser, '_parse_text_rule', return_value='text rule')
@mock.patch.object(_parser, '_parse_list_rule', return_value='list rule')
def test_parse_rule_list(self, mock_parse_list_rule, mock_parse_text_rule):
result = _parser.parse_rule([['a'], ['list']])
self.assertEqual(result, 'list rule')
self.assertFalse(mock_parse_text_rule.called)
mock_parse_list_rule.assert_called_once_with([['a'], ['list']])

View File

@ -15,26 +15,16 @@
"""Test of Policy Engine"""
import os
import mock
from oslo_concurrency.fixture import lockutils
from oslo_config import cfg
from oslo_config import fixture as config
from oslo_serialization import jsonutils
from oslotest import base as test_base
import six
import six.moves.urllib.parse as urlparse
import six.moves.urllib.request as urlrequest
from oslo_policy import _checks
from oslo_policy import _parser
from oslo_policy.openstack.common import fileutils
from oslo_policy import policy
TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
'..', 'tests/var'))
ENFORCER = policy.Enforcer(cfg.CONF)
from oslo_policy.tests import base
class MyException(Exception):
@ -79,7 +69,7 @@ class RulesTestCase(test_base.BaseTestCase):
self.assertEqual(rules['b'], 2)
self.assertEqual(rules['c'], 3)
@mock.patch.object(policy, '_parse_rule', lambda x: x)
@mock.patch.object(_parser, 'parse_rule', lambda x: x)
def test_load_json(self):
exemplar = """{
"admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]],
@ -108,26 +98,13 @@ class RulesTestCase(test_base.BaseTestCase):
"admin_or_owner": ""
}"""
rules = policy.Rules(dict(
admin_or_owner=policy.TrueCheck(),
admin_or_owner=_checks.TrueCheck(),
))
self.assertEqual(str(rules), exemplar)
class PolicyBaseTestCase(test_base.BaseTestCase):
def setUp(self):
super(PolicyBaseTestCase, self).setUp()
# NOTE(bnemec): Many of these tests use the same ENFORCER object, so
# I believe we need to serialize them.
self.useFixture(lockutils.LockFixture('policy-lock'))
self.CONF = self.useFixture(config.Config()).conf
self.CONF(args=['--config-dir', TEST_VAR_DIR])
self.enforcer = ENFORCER
self.addCleanup(self.enforcer.clear)
class EnforcerTest(PolicyBaseTestCase):
class EnforcerTest(base.PolicyBaseTestCase):
def test_load_file(self):
self.CONF.set_override('policy_dirs', [], group='oslo_policy')
@ -195,7 +172,7 @@ class EnforcerTest(PolicyBaseTestCase):
"cloudwatch:PutMetricData": ""
}"""
rules = policy.Rules.load_json(rules_json)
default_rule = policy.TrueCheck()
default_rule = _checks.TrueCheck()
enforcer = policy.Enforcer(cfg.CONF, default_rule=default_rule)
enforcer.set_rules(rules)
action = "cloudwatch:PutMetricData"
@ -204,9 +181,9 @@ class EnforcerTest(PolicyBaseTestCase):
def test_enforcer_force_reload_with_overwrite(self):
# Prepare in memory fake policies.
self.enforcer.set_rules({'test': policy._parse_rule('role:test')},
self.enforcer.set_rules({'test': _parser.parse_rule('role:test')},
use_conf=True)
self.enforcer.set_rules({'default': policy._parse_rule('role:fakeZ')},
self.enforcer.set_rules({'default': _parser.parse_rule('role:fakeZ')},
overwrite=False, # Keeps 'test' role.
use_conf=True)
@ -232,9 +209,9 @@ class EnforcerTest(PolicyBaseTestCase):
def test_enforcer_force_reload_without_overwrite(self):
# Prepare in memory fake policies.
self.enforcer.set_rules({'test': policy._parse_rule('role:test')},
self.enforcer.set_rules({'test': _parser.parse_rule('role:test')},
use_conf=True)
self.enforcer.set_rules({'default': policy._parse_rule('role:fakeZ')},
self.enforcer.set_rules({'default': _parser.parse_rule('role:fakeZ')},
overwrite=False, # Keeps 'test' role.
use_conf=True)
@ -346,23 +323,10 @@ class EnforcerTest(PolicyBaseTestCase):
self.assertEqual('bar_rule', enforcer.rules.default_rule)
class FakeCheck(policy.BaseCheck):
def __init__(self, result=None):
self.result = result
def __str__(self):
return str(self.result)
def __call__(self, target, creds, enforcer):
if self.result is not None:
return self.result
return (target, creds, enforcer)
class CheckFunctionTestCase(PolicyBaseTestCase):
class CheckFunctionTestCase(base.PolicyBaseTestCase):
def test_check_explicit(self):
rule = FakeCheck()
rule = base.FakeCheck()
result = self.enforcer.enforce(rule, "target", "creds")
self.assertEqual(result, ("target", "creds", self.enforcer))
@ -376,13 +340,13 @@ class CheckFunctionTestCase(PolicyBaseTestCase):
self.assertEqual(result, False)
def test_check_with_rule(self):
self.enforcer.set_rules(dict(default=FakeCheck()))
self.enforcer.set_rules(dict(default=base.FakeCheck()))
result = self.enforcer.enforce("default", "target", "creds")
self.assertEqual(result, ("target", "creds", self.enforcer))
def test_check_raises(self):
self.enforcer.set_rules(dict(default=policy.FalseCheck()))
self.enforcer.set_rules(dict(default=_checks.FalseCheck()))
try:
self.enforcer.enforce('rule', 'target', 'creds',
@ -393,755 +357,3 @@ class CheckFunctionTestCase(PolicyBaseTestCase):
self.assertEqual(exc.kwargs, dict(kw1="kwarg1", kw2="kwarg2"))
else:
self.fail("enforcer.enforce() failed to raise requested exception")
class FalseCheckTestCase(test_base.BaseTestCase):
def test_str(self):
check = policy.FalseCheck()
self.assertEqual(str(check), '!')
def test_call(self):
check = policy.FalseCheck()
self.assertEqual(check('target', 'creds', None), False)
class TrueCheckTestCase(test_base.BaseTestCase):
def test_str(self):
check = policy.TrueCheck()
self.assertEqual(str(check), '@')
def test_call(self):
check = policy.TrueCheck()
self.assertEqual(check('target', 'creds', None), True)
class CheckForTest(policy.Check):
def __call__(self, target, creds, enforcer):
pass
class CheckTestCase(test_base.BaseTestCase):
def test_init(self):
check = CheckForTest('kind', 'match')
self.assertEqual(check.kind, 'kind')
self.assertEqual(check.match, 'match')
def test_str(self):
check = CheckForTest('kind', 'match')
self.assertEqual(str(check), 'kind:match')
class NotCheckTestCase(test_base.BaseTestCase):
def test_init(self):
check = policy.NotCheck('rule')
self.assertEqual(check.rule, 'rule')
def test_str(self):
check = policy.NotCheck('rule')
self.assertEqual(str(check), 'not rule')
def test_call_true(self):
rule = mock.Mock(return_value=True)
check = policy.NotCheck(rule)
self.assertEqual(check('target', 'cred', None), False)
rule.assert_called_once_with('target', 'cred', None)
def test_call_false(self):
rule = mock.Mock(return_value=False)
check = policy.NotCheck(rule)
self.assertEqual(check('target', 'cred', None), True)
rule.assert_called_once_with('target', 'cred', None)
class AndCheckTestCase(test_base.BaseTestCase):
def test_init(self):
check = policy.AndCheck(['rule1', 'rule2'])
self.assertEqual(check.rules, ['rule1', 'rule2'])
def test_add_check(self):
check = policy.AndCheck(['rule1', 'rule2'])
check.add_check('rule3')
self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3'])
def test_str(self):
check = policy.AndCheck(['rule1', 'rule2'])
self.assertEqual(str(check), '(rule1 and rule2)')
def test_call_all_false(self):
rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)]
check = policy.AndCheck(rules)
self.assertEqual(check('target', 'cred', None), False)
rules[0].assert_called_once_with('target', 'cred', None)
self.assertFalse(rules[1].called)
def test_call_first_true(self):
rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)]
check = policy.AndCheck(rules)
self.assertFalse(check('target', 'cred', None))
rules[0].assert_called_once_with('target', 'cred', None)
rules[1].assert_called_once_with('target', 'cred', None)
def test_call_second_true(self):
rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)]
check = policy.AndCheck(rules)
self.assertFalse(check('target', 'cred', None))
rules[0].assert_called_once_with('target', 'cred', None)
self.assertFalse(rules[1].called)
class OrCheckTestCase(test_base.BaseTestCase):
def test_init(self):
check = policy.OrCheck(['rule1', 'rule2'])
self.assertEqual(check.rules, ['rule1', 'rule2'])
def test_add_check(self):
check = policy.OrCheck(['rule1', 'rule2'])
check.add_check('rule3')
self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3'])
def test_str(self):
check = policy.OrCheck(['rule1', 'rule2'])
self.assertEqual(str(check), '(rule1 or rule2)')
def test_call_all_false(self):
rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)]
check = policy.OrCheck(rules)
self.assertEqual(check('target', 'cred', None), False)
rules[0].assert_called_once_with('target', 'cred', None)
rules[1].assert_called_once_with('target', 'cred', None)
def test_call_first_true(self):
rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)]
check = policy.OrCheck(rules)
self.assertEqual(check('target', 'cred', None), True)
rules[0].assert_called_once_with('target', 'cred', None)
self.assertFalse(rules[1].called)
def test_call_second_true(self):
rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)]
check = policy.OrCheck(rules)
self.assertEqual(check('target', 'cred', None), True)
rules[0].assert_called_once_with('target', 'cred', None)
rules[1].assert_called_once_with('target', 'cred', None)
class ParseCheckTestCase(test_base.BaseTestCase):
def test_false(self):
result = policy._parse_check('!')
self.assertTrue(isinstance(result, policy.FalseCheck))
def test_true(self):
result = policy._parse_check('@')
self.assertTrue(isinstance(result, policy.TrueCheck))
def test_bad_rule(self):
result = policy._parse_check('foobar')
self.assertTrue(isinstance(result, policy.FalseCheck))
@mock.patch.object(policy, '_checks', {})
def test_no_handler(self):
result = policy._parse_check('no:handler')
self.assertTrue(isinstance(result, policy.FalseCheck))
@mock.patch.object(policy, '_checks', {
'spam': mock.Mock(return_value="spam_check"),
None: mock.Mock(return_value="none_check"),
})
def test_check(self):
result = policy._parse_check('spam:handler')
self.assertEqual(result, 'spam_check')
policy._checks['spam'].assert_called_once_with('spam', 'handler')
self.assertFalse(policy._checks[None].called)
@mock.patch.object(policy, '_checks', {
None: mock.Mock(return_value="none_check"),
})
def test_check_default(self):
result = policy._parse_check('spam:handler')
self.assertEqual(result, 'none_check')
policy._checks[None].assert_called_once_with('spam', 'handler')
class ParseListRuleTestCase(test_base.BaseTestCase):
def test_empty(self):
result = policy._parse_list_rule([])
self.assertTrue(isinstance(result, policy.TrueCheck))
self.assertEqual(str(result), '@')
@mock.patch.object(policy, '_parse_check', FakeCheck)
def test_oneele_zeroele(self):
result = policy._parse_list_rule([[]])
self.assertTrue(isinstance(result, policy.FalseCheck))
self.assertEqual(str(result), '!')
@mock.patch.object(policy, '_parse_check', FakeCheck)
def test_oneele_bare(self):
result = policy._parse_list_rule(['rule'])
self.assertTrue(isinstance(result, FakeCheck))
self.assertEqual(result.result, 'rule')
self.assertEqual(str(result), 'rule')
@mock.patch.object(policy, '_parse_check', FakeCheck)
def test_oneele_oneele(self):
result = policy._parse_list_rule([['rule']])
self.assertTrue(isinstance(result, FakeCheck))
self.assertEqual(result.result, 'rule')
self.assertEqual(str(result), 'rule')
@mock.patch.object(policy, '_parse_check', FakeCheck)
def test_oneele_multi(self):
result = policy._parse_list_rule([['rule1', 'rule2']])
self.assertTrue(isinstance(result, policy.AndCheck))
self.assertEqual(len(result.rules), 2)
for i, value in enumerate(['rule1', 'rule2']):
self.assertTrue(isinstance(result.rules[i], FakeCheck))
self.assertEqual(result.rules[i].result, value)
self.assertEqual(str(result), '(rule1 and rule2)')
@mock.patch.object(policy, '_parse_check', FakeCheck)
def test_multi_oneele(self):
result = policy._parse_list_rule([['rule1'], ['rule2']])
self.assertTrue(isinstance(result, policy.OrCheck))
self.assertEqual(len(result.rules), 2)
for i, value in enumerate(['rule1', 'rule2']):
self.assertTrue(isinstance(result.rules[i], FakeCheck))
self.assertEqual(result.rules[i].result, value)
self.assertEqual(str(result), '(rule1 or rule2)')
@mock.patch.object(policy, '_parse_check', FakeCheck)
def test_multi_multi(self):
result = policy._parse_list_rule([['rule1', 'rule2'],
['rule3', 'rule4']])
self.assertTrue(isinstance(result, policy.OrCheck))
self.assertEqual(len(result.rules), 2)
for i, values in enumerate([['rule1', 'rule2'], ['rule3', 'rule4']]):
self.assertTrue(isinstance(result.rules[i], policy.AndCheck))
self.assertEqual(len(result.rules[i].rules), 2)
for j, value in enumerate(values):
self.assertTrue(isinstance(result.rules[i].rules[j],
FakeCheck))
self.assertEqual(result.rules[i].rules[j].result, value)
self.assertEqual(str(result),
'((rule1 and rule2) or (rule3 and rule4))')
class ParseTokenizeTestCase(test_base.BaseTestCase):
@mock.patch.object(policy, '_parse_check', lambda x: x)
def test_tokenize(self):
exemplar = ("(( ( ((() And)) or ) (check:%(miss)s) not)) "
"'a-string' \"another-string\"")
expected = [
('(', '('), ('(', '('), ('(', '('), ('(', '('), ('(', '('),
('(', '('), (')', ')'), ('and', 'And'),
(')', ')'), (')', ')'), ('or', 'or'), (')', ')'), ('(', '('),
('check', 'check:%(miss)s'), (')', ')'), ('not', 'not'),
(')', ')'), (')', ')'),
('string', 'a-string'),
('string', 'another-string'),
]
result = list(policy._parse_tokenize(exemplar))
self.assertEqual(result, expected)
class ParseStateMetaTestCase(test_base.BaseTestCase):
def test_reducer(self):
@policy.reducer('a', 'b', 'c')
@policy.reducer('d', 'e', 'f')
def spam():
pass
self.assertTrue(hasattr(spam, 'reducers'))
self.assertEqual(spam.reducers, [['d', 'e', 'f'], ['a', 'b', 'c']])
def test_parse_state_meta(self):
@six.add_metaclass(policy._ParseStateMeta)
class FakeState(object):
@policy.reducer('a', 'b', 'c')
@policy.reducer('d', 'e', 'f')
def reduce1(self):
pass
@policy.reducer('g', 'h', 'i')
def reduce2(self):
pass
self.assertTrue(hasattr(FakeState, 'reducers'))
for reduction, reducer in FakeState.reducers:
if (reduction == ['a', 'b', 'c'] or
reduction == ['d', 'e', 'f']):
self.assertEqual(reducer, 'reduce1')
elif reduction == ['g', 'h', 'i']:
self.assertEqual(reducer, 'reduce2')
else:
self.fail("Unrecognized reducer discovered")
class ParseStateTestCase(test_base.BaseTestCase):
def test_init(self):
state = policy._ParseState()
self.assertEqual(state.tokens, [])
self.assertEqual(state.values, [])
@mock.patch.object(policy._ParseState, 'reducers', [(['tok1'], 'meth')])
@mock.patch.object(policy._ParseState, 'meth', create=True)
def test_reduce_none(self, mock_meth):
state = policy._ParseState()
state.tokens = ['tok2']
state.values = ['val2']
state.reduce()
self.assertEqual(state.tokens, ['tok2'])
self.assertEqual(state.values, ['val2'])
self.assertFalse(mock_meth.called)
@mock.patch.object(policy._ParseState, 'reducers',
[(['tok1', 'tok2'], 'meth')])
@mock.patch.object(policy._ParseState, 'meth', create=True)
def test_reduce_short(self, mock_meth):
state = policy._ParseState()
state.tokens = ['tok1']
state.values = ['val1']
state.reduce()
self.assertEqual(state.tokens, ['tok1'])
self.assertEqual(state.values, ['val1'])
self.assertFalse(mock_meth.called)
@mock.patch.object(policy._ParseState, 'reducers',
[(['tok1', 'tok2'], 'meth')])
@mock.patch.object(policy._ParseState, 'meth', create=True,
return_value=[('tok3', 'val3')])
def test_reduce_one(self, mock_meth):
state = policy._ParseState()
state.tokens = ['tok1', 'tok2']
state.values = ['val1', 'val2']
state.reduce()
self.assertEqual(state.tokens, ['tok3'])
self.assertEqual(state.values, ['val3'])
mock_meth.assert_called_once_with('val1', 'val2')
@mock.patch.object(policy._ParseState, 'reducers', [
(['tok1', 'tok4'], 'meth2'),
(['tok2', 'tok3'], 'meth1'),
])
@mock.patch.object(policy._ParseState, 'meth1', create=True,
return_value=[('tok4', 'val4')])
@mock.patch.object(policy._ParseState, 'meth2', create=True,
return_value=[('tok5', 'val5')])
def test_reduce_two(self, mock_meth2, mock_meth1):
state = policy._ParseState()
state.tokens = ['tok1', 'tok2', 'tok3']
state.values = ['val1', 'val2', 'val3']
state.reduce()
self.assertEqual(state.tokens, ['tok5'])
self.assertEqual(state.values, ['val5'])
mock_meth1.assert_called_once_with('val2', 'val3')
mock_meth2.assert_called_once_with('val1', 'val4')
@mock.patch.object(policy._ParseState, 'reducers',
[(['tok1', 'tok2'], 'meth')])
@mock.patch.object(policy._ParseState, 'meth', create=True,
return_value=[('tok3', 'val3'), ('tok4', 'val4')])
def test_reduce_multi(self, mock_meth):
state = policy._ParseState()
state.tokens = ['tok1', 'tok2']
state.values = ['val1', 'val2']
state.reduce()
self.assertEqual(state.tokens, ['tok3', 'tok4'])
self.assertEqual(state.values, ['val3', 'val4'])
mock_meth.assert_called_once_with('val1', 'val2')
def test_shift(self):
state = policy._ParseState()
with mock.patch.object(policy._ParseState, 'reduce') as mock_reduce:
state.shift('token', 'value')
self.assertEqual(state.tokens, ['token'])
self.assertEqual(state.values, ['value'])
mock_reduce.assert_called_once_with()
def test_result_empty(self):
state = policy._ParseState()
self.assertRaises(ValueError, lambda: state.result)
def test_result_unreduced(self):
state = policy._ParseState()
state.tokens = ['tok1', 'tok2']
state.values = ['val1', 'val2']
self.assertRaises(ValueError, lambda: state.result)
def test_result(self):
state = policy._ParseState()
state.tokens = ['token']
state.values = ['value']
self.assertEqual(state.result, 'value')
def test_wrap_check(self):
state = policy._ParseState()
result = state._wrap_check('(', 'the_check', ')')
self.assertEqual(result, [('check', 'the_check')])
@mock.patch.object(policy, 'AndCheck', lambda x: x)
def test_make_and_expr(self):
state = policy._ParseState()
result = state._make_and_expr('check1', 'and', 'check2')
self.assertEqual(result, [('and_expr', ['check1', 'check2'])])
def test_extend_and_expr(self):
state = policy._ParseState()
mock_expr = mock.Mock()
mock_expr.add_check.return_value = 'newcheck'
result = state._extend_and_expr(mock_expr, 'and', 'check')
self.assertEqual(result, [('and_expr', 'newcheck')])
mock_expr.add_check.assert_called_once_with('check')
@mock.patch.object(policy, 'OrCheck', lambda x: x)
def test_make_or_expr(self):
state = policy._ParseState()
result = state._make_or_expr('check1', 'or', 'check2')
self.assertEqual(result, [('or_expr', ['check1', 'check2'])])
def test_extend_or_expr(self):
state = policy._ParseState()
mock_expr = mock.Mock()
mock_expr.add_check.return_value = 'newcheck'
result = state._extend_or_expr(mock_expr, 'or', 'check')
self.assertEqual(result, [('or_expr', 'newcheck')])
mock_expr.add_check.assert_called_once_with('check')
@mock.patch.object(policy, 'NotCheck', lambda x: 'not %s' % x)
def test_make_not_expr(self):
state = policy._ParseState()
result = state._make_not_expr('not', 'check')
self.assertEqual(result, [('check', 'not check')])
class ParseTextRuleTestCase(test_base.BaseTestCase):
def test_empty(self):
result = policy._parse_text_rule('')
self.assertTrue(isinstance(result, policy.TrueCheck))
@mock.patch.object(policy, '_parse_tokenize',
return_value=[('tok1', 'val1'), ('tok2', 'val2')])
@mock.patch.object(policy._ParseState, 'shift')
@mock.patch.object(policy._ParseState, 'result', 'result')
def test_shifts(self, mock_shift, mock_parse_tokenize):
result = policy._parse_text_rule('test rule')
self.assertEqual(result, 'result')
mock_parse_tokenize.assert_called_once_with('test rule')
mock_shift.assert_has_calls(
[mock.call('tok1', 'val1'), mock.call('tok2', 'val2')])
@mock.patch.object(policy, '_parse_tokenize', return_value=[])
def test_fail(self, mock_parse_tokenize):
result = policy._parse_text_rule('test rule')
self.assertTrue(isinstance(result, policy.FalseCheck))
mock_parse_tokenize.assert_called_once_with('test rule')
class ParseRuleTestCase(test_base.BaseTestCase):
@mock.patch.object(policy, '_parse_text_rule', return_value='text rule')
@mock.patch.object(policy, '_parse_list_rule', return_value='list rule')
def test_parse_rule_string(self, mock_parse_list_rule,
mock_parse_text_rule):
result = policy._parse_rule("a string")
self.assertEqual(result, 'text rule')
self.assertFalse(mock_parse_list_rule.called)
mock_parse_text_rule.assert_called_once_with('a string')
@mock.patch.object(policy, '_parse_text_rule', return_value='text rule')
@mock.patch.object(policy, '_parse_list_rule', return_value='list rule')
def test_parse_rule_list(self, mock_parse_list_rule, mock_parse_text_rule):
result = policy._parse_rule([['a'], ['list']])
self.assertEqual(result, 'list rule')
self.assertFalse(mock_parse_text_rule.called)
mock_parse_list_rule.assert_called_once_with([['a'], ['list']])
class CheckRegisterTestCase(test_base.BaseTestCase):
@mock.patch.object(policy, '_checks', {})
def test_register_check(self):
class TestCheck(policy.Check):
pass
policy.register('spam', TestCheck)
self.assertEqual(policy._checks, dict(spam=TestCheck))
@mock.patch.object(policy, '_checks', {})
def test_register_check_decorator(self):
@policy.register('spam')
class TestCheck(policy.Check):
pass
self.assertEqual(policy._checks, dict(spam=TestCheck))
class RuleCheckTestCase(test_base.BaseTestCase):
@mock.patch.object(ENFORCER, 'rules', {})
def test_rule_missing(self):
check = policy.RuleCheck('rule', 'spam')
self.assertEqual(check('target', 'creds', ENFORCER), False)
@mock.patch.object(ENFORCER, 'rules',
dict(spam=mock.Mock(return_value=False)))
def test_rule_false(self):
enforcer = ENFORCER
check = policy.RuleCheck('rule', 'spam')
self.assertEqual(check('target', 'creds', enforcer), False)
enforcer.rules['spam'].assert_called_once_with('target', 'creds',
enforcer)
@mock.patch.object(ENFORCER, 'rules',
dict(spam=mock.Mock(return_value=True)))
def test_rule_true(self):
enforcer = ENFORCER
check = policy.RuleCheck('rule', 'spam')
self.assertEqual(check('target', 'creds', enforcer), True)
enforcer.rules['spam'].assert_called_once_with('target', 'creds',
enforcer)
class RoleCheckTestCase(PolicyBaseTestCase):
def test_accept(self):
check = policy.RoleCheck('role', 'sPaM')
self.assertEqual(check('target', dict(roles=['SpAm']),
self.enforcer), True)
def test_reject(self):
check = policy.RoleCheck('role', 'spam')
self.assertEqual(check('target', dict(roles=[]), self.enforcer), False)
class HttpCheckTestCase(PolicyBaseTestCase):
def decode_post_data(self, post_data):
result = {}
for item in post_data.split('&'):
key, _sep, value = item.partition('=')
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
return result
@mock.patch.object(urlrequest, 'urlopen',
return_value=six.StringIO('True'))
def test_accept(self, mock_urlopen):
check = policy.HttpCheck('http', '//example.com/%(name)s')
self.assertEqual(check(dict(name='target', spam='spammer'),
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer),
True)
self.assertEqual(mock_urlopen.call_count, 1)
args = mock_urlopen.call_args[0]
self.assertEqual(args[0], 'http://example.com/target')
self.assertEqual(self.decode_post_data(args[1]), dict(
target=dict(name='target', spam='spammer'),
credentials=dict(user='user', roles=['a', 'b', 'c']),
))
@mock.patch.object(urlrequest, 'urlopen',
return_value=six.StringIO('other'))
def test_reject(self, mock_urlopen):
check = policy.HttpCheck('http', '//example.com/%(name)s')
self.assertEqual(check(dict(name='target', spam='spammer'),
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer),
False)
self.assertEqual(mock_urlopen.call_count, 1)
args = mock_urlopen.call_args[0]
self.assertEqual(args[0], 'http://example.com/target')
self.assertEqual(self.decode_post_data(args[1]), dict(
target=dict(name='target', spam='spammer'),
credentials=dict(user='user', roles=['a', 'b', 'c']),
))
@mock.patch.object(urlrequest, 'urlopen',
return_value=six.StringIO('True'))
def test_http_with_objects_in_target(self, mock_urlopen):
check = policy.HttpCheck('http', '//example.com/%(name)s')
target = {'a': object(),
'name': 'target',
'b': 'test data'}
self.assertEqual(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer),
True)
@mock.patch.object(urlrequest, 'urlopen',
return_value=six.StringIO('True'))
def test_http_with_strings_in_target(self, mock_urlopen):
check = policy.HttpCheck('http', '//example.com/%(name)s')
target = {'a': 'some_string',
'name': 'target',
'b': 'test data'}
self.assertEqual(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer),
True)
class GenericCheckTestCase(PolicyBaseTestCase):
def test_no_cred(self):
check = policy.GenericCheck('name', '%(name)s')
self.assertEqual(check(dict(name='spam'), {}, self.enforcer), False)
def test_cred_mismatch(self):
check = policy.GenericCheck('name', '%(name)s')
self.assertEqual(check(dict(name='spam'),
dict(name='ham'),
self.enforcer), False)
def test_accept(self):
check = policy.GenericCheck('name', '%(name)s')
self.assertEqual(check(dict(name='spam'),
dict(name='spam'),
self.enforcer), True)
def test_no_key_match_in_target(self):
check = policy.GenericCheck('name', '%(name)s')
self.assertEqual(check(dict(name1='spam'),
dict(name='spam'),
self.enforcer), False)
def test_constant_string_mismatch(self):
check = policy.GenericCheck("'spam'", '%(name)s')
self.assertEqual(check(dict(name='ham'),
{},
self.enforcer), False)
def test_constant_string_accept(self):
check = policy.GenericCheck("'spam'", '%(name)s')
self.assertEqual(check(dict(name='spam'),
{},
self.enforcer), True)
def test_constant_literal_mismatch(self):
check = policy.GenericCheck("True", '%(enabled)s')
self.assertEqual(check(dict(enabled=False),
{},
self.enforcer), False)
def test_constant_literal_accept(self):
check = policy.GenericCheck("True", '%(enabled)s')
self.assertEqual(check(dict(enabled=True),
{},
self.enforcer), True)
def test_deep_credentials_dictionary_lookup(self):
check = policy.GenericCheck("a.b.c.d", 'APPLES')
credentials = {'a': {'b': {'c': {'d': 'APPLES'}}}}
self.assertEqual(check({},
credentials,
self.enforcer), True)
def test_missing_credentials_dictionary_lookup(self):
credentials = {'a': 'APPLES', 'o': {'t': 'ORANGES'}}
# First a valid check - rest of case is expecting failures
# Should prove the basic credentials structure before we test
# for failure cases.
check = policy.GenericCheck("o.t", 'ORANGES')
self.assertEqual(check({},
credentials,
self.enforcer), True)
# Case where final key is missing
check = policy.GenericCheck("o.v", 'ORANGES')
self.assertEqual(check({},
credentials,
self.enforcer), False)
# Attempt to access key under a missing dictionary
check = policy.GenericCheck("q.v", 'APPLES')
self.assertEqual(check({},
credentials,
self.enforcer), False)