diff --git a/oslo_policy/_checks.py b/oslo_policy/_checks.py new file mode 100644 index 00000000..7860b61d --- /dev/null +++ b/oslo_policy/_checks.py @@ -0,0 +1,310 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2015 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import abc +import ast +import copy + +from oslo_serialization import jsonutils +import six +import six.moves.urllib.parse as urlparse +import six.moves.urllib.request as urlrequest + + +registered_checks = {} + + +@six.add_metaclass(abc.ABCMeta) +class BaseCheck(object): + """Abstract base class for Check classes.""" + + @abc.abstractmethod + def __str__(self): + """String representation of the Check tree rooted at this node.""" + + pass + + @abc.abstractmethod + def __call__(self, target, cred, enforcer): + """Triggers if instance of the class is called. + + Performs the check. Returns False to reject the access or a + true value (not necessary True) to accept the access. + """ + + pass + + +class FalseCheck(BaseCheck): + """A policy check that always returns ``False`` (disallow).""" + + def __str__(self): + """Return a string representation of this check.""" + + return "!" + + def __call__(self, target, cred, enforcer): + """Check the policy.""" + + return False + + +class TrueCheck(BaseCheck): + """A policy check that always returns ``True`` (allow).""" + + def __str__(self): + """Return a string representation of this check.""" + + return "@" + + def __call__(self, target, cred, enforcer): + """Check the policy.""" + + return True + + +class Check(BaseCheck): + """A base class to allow for user-defined policy checks. + + :param kind: The kind of the check, i.e., the field before the ``:``. + :param match: The match of the check, i.e., the field after the ``:``. + + """ + + def __init__(self, kind, match): + self.kind = kind + self.match = match + + def __str__(self): + """Return a string representation of this check.""" + + return "%s:%s" % (self.kind, self.match) + + +class NotCheck(BaseCheck): + """Implements the "not" logical operator. + + A policy check that inverts the result of another policy check. + + :param rule: The rule to negate. Must be a Check. + + """ + + def __init__(self, rule): + self.rule = rule + + def __str__(self): + """Return a string representation of this check.""" + + return "not %s" % self.rule + + def __call__(self, target, cred, enforcer): + """Check the policy. + + Returns the logical inverse of the wrapped check. + """ + + return not self.rule(target, cred, enforcer) + + +class AndCheck(BaseCheck): + """Implements the "and" logical operator. + + A policy check that requires that a list of other checks all return True. + + :param list rules: rules that will be tested. + + """ + + def __init__(self, rules): + self.rules = rules + + def __str__(self): + """Return a string representation of this check.""" + + return "(%s)" % ' and '.join(str(r) for r in self.rules) + + def __call__(self, target, cred, enforcer): + """Check the policy. + + Requires that all rules accept in order to return True. + """ + + for rule in self.rules: + if not rule(target, cred, enforcer): + return False + + return True + + def add_check(self, rule): + """Adds rule to be tested. + + Allows addition of another rule to the list of rules that will + be tested. + + :returns: self + :rtype: :class:`.AndCheck` + """ + + self.rules.append(rule) + return self + + +class OrCheck(BaseCheck): + """Implements the "or" operator. + + A policy check that requires that at least one of a list of other + checks returns ``True``. + + :param rules: A list of rules that will be tested. + + """ + + def __init__(self, rules): + self.rules = rules + + def __str__(self): + """Return a string representation of this check.""" + + return "(%s)" % ' or '.join(str(r) for r in self.rules) + + def __call__(self, target, cred, enforcer): + """Check the policy. + + Requires that at least one rule accept in order to return True. + """ + + for rule in self.rules: + if rule(target, cred, enforcer): + return True + return False + + def add_check(self, rule): + """Adds rule to be tested. + + Allows addition of another rule to the list of rules that will + be tested. Returns the OrCheck object for convenience. + """ + + self.rules.append(rule) + return self + + +def register(name, func=None): + """Register a function or :class:`.Check` class as a policy check. + + :param name: Gives the name of the check type, e.g., "rule", + "role", etc. If name is ``None``, a default check type + will be registered. + :param func: If given, provides the function or class to register. + If not given, returns a function taking one argument + to specify the function or class to register, + allowing use as a decorator. + """ + + # Perform the actual decoration by registering the function or + # class. Returns the function or class for compliance with the + # decorator interface. + def decorator(func): + registered_checks[name] = func + return func + + # If the function or class is given, do the registration + if func: + return decorator(func) + + return decorator + + +@register("rule") +class RuleCheck(Check): + """Recursively checks credentials based on the defined rules.""" + + def __call__(self, target, creds, enforcer): + try: + return enforcer.rules[self.match](target, creds, enforcer) + except KeyError: + # We don't have any matching rule; fail closed + return False + + +@register("role") +class RoleCheck(Check): + """Check that there is a matching role in the ``creds`` dict.""" + + def __call__(self, target, creds, enforcer): + return self.match.lower() in [x.lower() for x in creds['roles']] + + +@register('http') +class HttpCheck(Check): + """Check ``http:`` rules by calling to a remote server. + + This example implementation simply verifies that the response + is exactly ``True``. + """ + + def __call__(self, target, creds, enforcer): + url = ('http:' + self.match) % target + + # Convert instances of object() in target temporarily to + # empty dict to avoid circular reference detection + # errors in jsonutils.dumps(). + temp_target = copy.deepcopy(target) + for key in target.keys(): + element = target.get(key) + if type(element) is object: + temp_target[key] = {} + + data = {'target': jsonutils.dumps(temp_target), + 'credentials': jsonutils.dumps(creds)} + post_data = urlparse.urlencode(data) + f = urlrequest.urlopen(url, post_data) + return f.read() == "True" + + +@register(None) +class GenericCheck(Check): + """Check an individual match. + + Matches look like: + + - tenant:%(tenant_id)s + - role:compute:admin + - True:%(user.enabled)s + - 'Member':%(role.name)s + """ + + def __call__(self, target, creds, enforcer): + try: + match = self.match % target + except KeyError: + # While doing GenericCheck if key not + # present in Target return false + return False + + try: + # Try to interpret self.kind as a literal + leftval = ast.literal_eval(self.kind) + except ValueError: + try: + kind_parts = self.kind.split('.') + leftval = creds + for kind_part in kind_parts: + leftval = leftval[kind_part] + except KeyError: + return False + return match == six.text_type(leftval) diff --git a/oslo_policy/_parser.py b/oslo_policy/_parser.py new file mode 100644 index 00000000..9b296be8 --- /dev/null +++ b/oslo_policy/_parser.py @@ -0,0 +1,341 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2015 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import re + +from oslo_log import log as logging +import six + +from oslo_policy import _checks +from oslo_policy import _i18n + + +LOG = logging.getLogger(__name__) + +_, _LE, _LI = _i18n._, _i18n._LE, _i18n._LI + + +def reducer(*tokens): + """Decorator for reduction methods. + + Arguments are a sequence of tokens, in order, which should trigger running + this reduction method. + """ + + def decorator(func): + # Make sure we have a list of reducer sequences + if not hasattr(func, 'reducers'): + func.reducers = [] + + # Add the tokens to the list of reducer sequences + func.reducers.append(list(tokens)) + + return func + + return decorator + + +class ParseStateMeta(type): + """Metaclass for the :class:`.ParseState` class. + + Facilitates identifying reduction methods. + """ + + def __new__(mcs, name, bases, cls_dict): + """Create the class. + + Injects the 'reducers' list, a list of tuples matching token sequences + to the names of the corresponding reduction methods. + """ + + reducers = [] + + for key, value in cls_dict.items(): + if not hasattr(value, 'reducers'): + continue + for reduction in value.reducers: + reducers.append((reduction, key)) + + cls_dict['reducers'] = reducers + + return super(ParseStateMeta, mcs).__new__(mcs, name, bases, cls_dict) + + +@six.add_metaclass(ParseStateMeta) +class ParseState(object): + """Implement the core of parsing the policy language. + + Uses a greedy reduction algorithm to reduce a sequence of tokens into + a single terminal, the value of which will be the root of the + :class:`Check` tree. + + .. note:: + + Error reporting is rather lacking. The best we can get with this + parser formulation is an overall "parse failed" error. Fortunately, the + policy language is simple enough that this shouldn't be that big a + problem. + """ + + def __init__(self): + """Initialize the ParseState.""" + + self.tokens = [] + self.values = [] + + def reduce(self): + """Perform a greedy reduction of the token stream. + + If a reducer method matches, it will be executed, then the + :meth:`reduce` method will be called recursively to search for any more + possible reductions. + """ + + for reduction, methname in self.reducers: + if (len(self.tokens) >= len(reduction) and + self.tokens[-len(reduction):] == reduction): + # Get the reduction method + meth = getattr(self, methname) + + # Reduce the token stream + results = meth(*self.values[-len(reduction):]) + + # Update the tokens and values + self.tokens[-len(reduction):] = [r[0] for r in results] + self.values[-len(reduction):] = [r[1] for r in results] + + # Check for any more reductions + return self.reduce() + + def shift(self, tok, value): + """Adds one more token to the state. + + Calls :meth:`reduce`. + """ + + self.tokens.append(tok) + self.values.append(value) + + # Do a greedy reduce... + self.reduce() + + @property + def result(self): + """Obtain the final result of the parse. + + :raises ValueError: If the parse failed to reduce to a single result. + """ + + if len(self.values) != 1: + raise ValueError("Could not parse rule") + return self.values[0] + + @reducer('(', 'check', ')') + @reducer('(', 'and_expr', ')') + @reducer('(', 'or_expr', ')') + def _wrap_check(self, _p1, check, _p2): + """Turn parenthesized expressions into a 'check' token.""" + + return [('check', check)] + + @reducer('check', 'and', 'check') + def _make_and_expr(self, check1, _and, check2): + """Create an 'and_expr'. + + Join two checks by the 'and' operator. + """ + + return [('and_expr', _checks.AndCheck([check1, check2]))] + + @reducer('and_expr', 'and', 'check') + def _extend_and_expr(self, and_expr, _and, check): + """Extend an 'and_expr' by adding one more check.""" + + return [('and_expr', and_expr.add_check(check))] + + @reducer('check', 'or', 'check') + def _make_or_expr(self, check1, _or, check2): + """Create an 'or_expr'. + + Join two checks by the 'or' operator. + """ + + return [('or_expr', _checks.OrCheck([check1, check2]))] + + @reducer('or_expr', 'or', 'check') + def _extend_or_expr(self, or_expr, _or, check): + """Extend an 'or_expr' by adding one more check.""" + + return [('or_expr', or_expr.add_check(check))] + + @reducer('not', 'check') + def _make_not_expr(self, _not, check): + """Invert the result of another check.""" + + return [('check', _checks.NotCheck(check))] + + +def _parse_check(rule): + """Parse a single base check rule into an appropriate Check object.""" + + # Handle the special checks + if rule == '!': + return _checks.FalseCheck() + elif rule == '@': + return _checks.TrueCheck() + + try: + kind, match = rule.split(':', 1) + except Exception: + LOG.exception(_LE("Failed to understand rule %s") % rule) + # If the rule is invalid, we'll fail closed + return _checks.FalseCheck() + + # Find what implements the check + if kind in _checks.registered_checks: + return _checks.registered_checks[kind](kind, match) + elif None in _checks.registered_checks: + return _checks.registered_checks[None](kind, match) + else: + LOG.error(_LE("No handler for matches of kind %s") % kind) + return _checks.FalseCheck() + + +def _parse_list_rule(rule): + """Translates the old list-of-lists syntax into a tree of Check objects. + + Provided for backwards compatibility. + """ + + # Empty rule defaults to True + if not rule: + return _checks.TrueCheck() + + # Outer list is joined by "or"; inner list by "and" + or_list = [] + for inner_rule in rule: + # Elide empty inner lists + if not inner_rule: + continue + + # Handle bare strings + if isinstance(inner_rule, six.string_types): + inner_rule = [inner_rule] + + # Parse the inner rules into Check objects + and_list = [_parse_check(r) for r in inner_rule] + + # Append the appropriate check to the or_list + if len(and_list) == 1: + or_list.append(and_list[0]) + else: + or_list.append(_checks.AndCheck(and_list)) + + # If we have only one check, omit the "or" + if not or_list: + return _checks.FalseCheck() + elif len(or_list) == 1: + return or_list[0] + + return _checks.OrCheck(or_list) + + +# Used for tokenizing the policy language +_tokenize_re = re.compile(r'\s+') + + +def _parse_tokenize(rule): + """Tokenizer for the policy language. + + Most of the single-character tokens are specified in the + _tokenize_re; however, parentheses need to be handled specially, + because they can appear inside a check string. Thankfully, those + parentheses that appear inside a check string can never occur at + the very beginning or end ("%(variable)s" is the correct syntax). + """ + + for tok in _tokenize_re.split(rule): + # Skip empty tokens + if not tok or tok.isspace(): + continue + + # Handle leading parens on the token + clean = tok.lstrip('(') + for i in range(len(tok) - len(clean)): + yield '(', '(' + + # If it was only parentheses, continue + if not clean: + continue + else: + tok = clean + + # Handle trailing parens on the token + clean = tok.rstrip(')') + trail = len(tok) - len(clean) + + # Yield the cleaned token + lowered = clean.lower() + if lowered in ('and', 'or', 'not'): + # Special tokens + yield lowered, clean + elif clean: + # Not a special token, but not composed solely of ')' + if len(tok) >= 2 and ((tok[0], tok[-1]) in + [('"', '"'), ("'", "'")]): + # It's a quoted string + yield 'string', tok[1:-1] + else: + yield 'check', _parse_check(clean) + + # Yield the trailing parens + for i in range(trail): + yield ')', ')' + + +def _parse_text_rule(rule): + """Parses policy to the tree. + + Translates a policy written in the policy language into a tree of + Check objects. + """ + + # Empty rule means always accept + if not rule: + return _checks.TrueCheck() + + # Parse the token stream + state = ParseState() + for tok, value in _parse_tokenize(rule): + state.shift(tok, value) + + try: + return state.result + except ValueError: + # Couldn't parse the rule + LOG.exception(_LE("Failed to understand rule %s") % rule) + + # Fail closed + return _checks.FalseCheck() + + +def parse_rule(rule): + """Parses a policy rule into a tree of :class:`.Check` objects.""" + + # If the rule is a string, it's in the policy language + if isinstance(rule, six.string_types): + return _parse_text_rule(rule) + return _parse_list_rule(rule) diff --git a/oslo_policy/policy.py b/oslo_policy/policy.py index 583593da..343d6228 100644 --- a/oslo_policy/policy.py +++ b/oslo_policy/policy.py @@ -202,22 +202,19 @@ by setting the ``policy_default_rule`` configuration setting to the desired rule name. """ -import abc -import ast -import copy import os -import re from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils import six -import six.moves.urllib.parse as urlparse -import six.moves.urllib.request as urlrequest -from oslo_policy._i18n import _, _LE, _LI +from oslo_policy import _checks +from oslo_policy import _i18n +from oslo_policy import _parser from oslo_policy.openstack.common import fileutils +_, _LI = _i18n._, _i18n._LI _opts = [ cfg.StrOpt('policy_file', @@ -243,7 +240,9 @@ _opts = [ LOG = logging.getLogger(__name__) -_checks = {} +register = _checks.register +BaseCheck = _checks.BaseCheck +Check = _checks.Check class PolicyNotAuthorized(Exception): @@ -262,7 +261,7 @@ class Rules(dict): """Allow loading of JSON rule data.""" # Suck in the JSON data and parse the rules - rules = dict((k, _parse_rule(v)) for k, v in + rules = dict((k, _parser.parse_rule(v)) for k, v in jsonutils.loads(data).items()) return cls(rules, default_rule) @@ -284,7 +283,7 @@ class Rules(dict): if not self.default_rule: raise KeyError(key) - if isinstance(self.default_rule, BaseCheck): + if isinstance(self.default_rule, _checks.BaseCheck): return self.default_rule # We need to check this or we can get infinite recursion @@ -301,7 +300,7 @@ class Rules(dict): out_rules = {} for key, value in self.items(): # Use empty string for singleton TrueCheck instances - if isinstance(value, TrueCheck): + if isinstance(value, _checks.TrueCheck): out_rules[key] = '' else: out_rules[key] = str(value) @@ -460,7 +459,7 @@ class Enforcer(object): self.load_rules() # Allow the rule to be a Check tree - if isinstance(rule, BaseCheck): + if isinstance(rule, _checks.BaseCheck): result = rule(target, creds, self) elif not self.rules: # No rules to reference means we're going to fail closed @@ -482,599 +481,3 @@ class Enforcer(object): raise PolicyNotAuthorized(rule) return result - - -@six.add_metaclass(abc.ABCMeta) -class BaseCheck(object): - """Abstract base class for Check classes.""" - - @abc.abstractmethod - def __str__(self): - """String representation of the Check tree rooted at this node.""" - - pass - - @abc.abstractmethod - def __call__(self, target, cred, enforcer): - """Triggers if instance of the class is called. - - Performs the check. Returns False to reject the access or a - true value (not necessary True) to accept the access. - """ - - pass - - -class FalseCheck(BaseCheck): - """A policy check that always returns ``False`` (disallow).""" - - def __str__(self): - """Return a string representation of this check.""" - - return "!" - - def __call__(self, target, cred, enforcer): - """Check the policy.""" - - return False - - -class TrueCheck(BaseCheck): - """A policy check that always returns ``True`` (allow).""" - - def __str__(self): - """Return a string representation of this check.""" - - return "@" - - def __call__(self, target, cred, enforcer): - """Check the policy.""" - - return True - - -class Check(BaseCheck): - """A base class to allow for user-defined policy checks. - - :param kind: The kind of the check, i.e., the field before the ``:``. - :param match: The match of the check, i.e., the field after the ``:``. - - """ - - def __init__(self, kind, match): - self.kind = kind - self.match = match - - def __str__(self): - """Return a string representation of this check.""" - - return "%s:%s" % (self.kind, self.match) - - -class NotCheck(BaseCheck): - """Implements the "not" logical operator. - - A policy check that inverts the result of another policy check. - - :param rule: The rule to negate. Must be a Check. - - """ - - def __init__(self, rule): - self.rule = rule - - def __str__(self): - """Return a string representation of this check.""" - - return "not %s" % self.rule - - def __call__(self, target, cred, enforcer): - """Check the policy. - - Returns the logical inverse of the wrapped check. - """ - - return not self.rule(target, cred, enforcer) - - -class AndCheck(BaseCheck): - """Implements the "and" logical operator. - - A policy check that requires that a list of other checks all return True. - - :param list rules: rules that will be tested. - - """ - - def __init__(self, rules): - self.rules = rules - - def __str__(self): - """Return a string representation of this check.""" - - return "(%s)" % ' and '.join(str(r) for r in self.rules) - - def __call__(self, target, cred, enforcer): - """Check the policy. - - Requires that all rules accept in order to return True. - """ - - for rule in self.rules: - if not rule(target, cred, enforcer): - return False - - return True - - def add_check(self, rule): - """Adds rule to be tested. - - Allows addition of another rule to the list of rules that will - be tested. - - :returns: self - :rtype: :class:`.AndCheck` - """ - - self.rules.append(rule) - return self - - -class OrCheck(BaseCheck): - """Implements the "or" operator. - - A policy check that requires that at least one of a list of other - checks returns ``True``. - - :param rules: A list of rules that will be tested. - - """ - - def __init__(self, rules): - self.rules = rules - - def __str__(self): - """Return a string representation of this check.""" - - return "(%s)" % ' or '.join(str(r) for r in self.rules) - - def __call__(self, target, cred, enforcer): - """Check the policy. - - Requires that at least one rule accept in order to return True. - """ - - for rule in self.rules: - if rule(target, cred, enforcer): - return True - return False - - def add_check(self, rule): - """Adds rule to be tested. - - Allows addition of another rule to the list of rules that will - be tested. Returns the OrCheck object for convenience. - """ - - self.rules.append(rule) - return self - - -def _parse_check(rule): - """Parse a single base check rule into an appropriate Check object.""" - - # Handle the special checks - if rule == '!': - return FalseCheck() - elif rule == '@': - return TrueCheck() - - try: - kind, match = rule.split(':', 1) - except Exception: - LOG.exception(_LE("Failed to understand rule %s") % rule) - # If the rule is invalid, we'll fail closed - return FalseCheck() - - # Find what implements the check - if kind in _checks: - return _checks[kind](kind, match) - elif None in _checks: - return _checks[None](kind, match) - else: - LOG.error(_LE("No handler for matches of kind %s") % kind) - return FalseCheck() - - -def _parse_list_rule(rule): - """Translates the old list-of-lists syntax into a tree of Check objects. - - Provided for backwards compatibility. - """ - - # Empty rule defaults to True - if not rule: - return TrueCheck() - - # Outer list is joined by "or"; inner list by "and" - or_list = [] - for inner_rule in rule: - # Elide empty inner lists - if not inner_rule: - continue - - # Handle bare strings - if isinstance(inner_rule, six.string_types): - inner_rule = [inner_rule] - - # Parse the inner rules into Check objects - and_list = [_parse_check(r) for r in inner_rule] - - # Append the appropriate check to the or_list - if len(and_list) == 1: - or_list.append(and_list[0]) - else: - or_list.append(AndCheck(and_list)) - - # If we have only one check, omit the "or" - if not or_list: - return FalseCheck() - elif len(or_list) == 1: - return or_list[0] - - return OrCheck(or_list) - - -# Used for tokenizing the policy language -_tokenize_re = re.compile(r'\s+') - - -def _parse_tokenize(rule): - """Tokenizer for the policy language. - - Most of the single-character tokens are specified in the - _tokenize_re; however, parentheses need to be handled specially, - because they can appear inside a check string. Thankfully, those - parentheses that appear inside a check string can never occur at - the very beginning or end ("%(variable)s" is the correct syntax). - """ - - for tok in _tokenize_re.split(rule): - # Skip empty tokens - if not tok or tok.isspace(): - continue - - # Handle leading parens on the token - clean = tok.lstrip('(') - for i in range(len(tok) - len(clean)): - yield '(', '(' - - # If it was only parentheses, continue - if not clean: - continue - else: - tok = clean - - # Handle trailing parens on the token - clean = tok.rstrip(')') - trail = len(tok) - len(clean) - - # Yield the cleaned token - lowered = clean.lower() - if lowered in ('and', 'or', 'not'): - # Special tokens - yield lowered, clean - elif clean: - # Not a special token, but not composed solely of ')' - if len(tok) >= 2 and ((tok[0], tok[-1]) in - [('"', '"'), ("'", "'")]): - # It's a quoted string - yield 'string', tok[1:-1] - else: - yield 'check', _parse_check(clean) - - # Yield the trailing parens - for i in range(trail): - yield ')', ')' - - -class _ParseStateMeta(type): - """Metaclass for the :class:`.ParseState` class. - - Facilitates identifying reduction methods. - """ - - def __new__(mcs, name, bases, cls_dict): - """Create the class. - - Injects the 'reducers' list, a list of tuples matching token sequences - to the names of the corresponding reduction methods. - """ - - reducers = [] - - for key, value in cls_dict.items(): - if not hasattr(value, 'reducers'): - continue - for reduction in value.reducers: - reducers.append((reduction, key)) - - cls_dict['reducers'] = reducers - - return super(_ParseStateMeta, mcs).__new__(mcs, name, bases, cls_dict) - - -def reducer(*tokens): - """Decorator for reduction methods. - - Arguments are a sequence of tokens, in order, which should trigger running - this reduction method. - """ - - def decorator(func): - # Make sure we have a list of reducer sequences - if not hasattr(func, 'reducers'): - func.reducers = [] - - # Add the tokens to the list of reducer sequences - func.reducers.append(list(tokens)) - - return func - - return decorator - - -@six.add_metaclass(_ParseStateMeta) -class _ParseState(object): - """Implement the core of parsing the policy language. - - Uses a greedy reduction algorithm to reduce a sequence of tokens into - a single terminal, the value of which will be the root of the - :class:`Check` tree. - - .. note:: - - Error reporting is rather lacking. The best we can get with this - parser formulation is an overall "parse failed" error. Fortunately, the - policy language is simple enough that this shouldn't be that big a - problem. - """ - - def __init__(self): - """Initialize the _ParseState.""" - - self.tokens = [] - self.values = [] - - def reduce(self): - """Perform a greedy reduction of the token stream. - - If a reducer method matches, it will be executed, then the - :meth:`reduce` method will be called recursively to search for any more - possible reductions. - """ - - for reduction, methname in self.reducers: - if (len(self.tokens) >= len(reduction) and - self.tokens[-len(reduction):] == reduction): - # Get the reduction method - meth = getattr(self, methname) - - # Reduce the token stream - results = meth(*self.values[-len(reduction):]) - - # Update the tokens and values - self.tokens[-len(reduction):] = [r[0] for r in results] - self.values[-len(reduction):] = [r[1] for r in results] - - # Check for any more reductions - return self.reduce() - - def shift(self, tok, value): - """Adds one more token to the state. - - Calls :meth:`reduce`. - """ - - self.tokens.append(tok) - self.values.append(value) - - # Do a greedy reduce... - self.reduce() - - @property - def result(self): - """Obtain the final result of the parse. - - :raises ValueError: If the parse failed to reduce to a single result. - """ - - if len(self.values) != 1: - raise ValueError("Could not parse rule") - return self.values[0] - - @reducer('(', 'check', ')') - @reducer('(', 'and_expr', ')') - @reducer('(', 'or_expr', ')') - def _wrap_check(self, _p1, check, _p2): - """Turn parenthesized expressions into a 'check' token.""" - - return [('check', check)] - - @reducer('check', 'and', 'check') - def _make_and_expr(self, check1, _and, check2): - """Create an 'and_expr'. - - Join two checks by the 'and' operator. - """ - - return [('and_expr', AndCheck([check1, check2]))] - - @reducer('and_expr', 'and', 'check') - def _extend_and_expr(self, and_expr, _and, check): - """Extend an 'and_expr' by adding one more check.""" - - return [('and_expr', and_expr.add_check(check))] - - @reducer('check', 'or', 'check') - def _make_or_expr(self, check1, _or, check2): - """Create an 'or_expr'. - - Join two checks by the 'or' operator. - """ - - return [('or_expr', OrCheck([check1, check2]))] - - @reducer('or_expr', 'or', 'check') - def _extend_or_expr(self, or_expr, _or, check): - """Extend an 'or_expr' by adding one more check.""" - - return [('or_expr', or_expr.add_check(check))] - - @reducer('not', 'check') - def _make_not_expr(self, _not, check): - """Invert the result of another check.""" - - return [('check', NotCheck(check))] - - -def _parse_text_rule(rule): - """Parses policy to the tree. - - Translates a policy written in the policy language into a tree of - Check objects. - """ - - # Empty rule means always accept - if not rule: - return TrueCheck() - - # Parse the token stream - state = _ParseState() - for tok, value in _parse_tokenize(rule): - state.shift(tok, value) - - try: - return state.result - except ValueError: - # Couldn't parse the rule - LOG.exception(_LE("Failed to understand rule %s") % rule) - - # Fail closed - return FalseCheck() - - -def _parse_rule(rule): - """Parses a policy rule into a tree of :class:`.Check` objects.""" - - # If the rule is a string, it's in the policy language - if isinstance(rule, six.string_types): - return _parse_text_rule(rule) - return _parse_list_rule(rule) - - -def register(name, func=None): - """Register a function or :class:`.Check` class as a policy check. - - :param name: Gives the name of the check type, e.g., "rule", - "role", etc. If name is ``None``, a default check type - will be registered. - :param func: If given, provides the function or class to register. - If not given, returns a function taking one argument - to specify the function or class to register, - allowing use as a decorator. - """ - - # Perform the actual decoration by registering the function or - # class. Returns the function or class for compliance with the - # decorator interface. - def decorator(func): - _checks[name] = func - return func - - # If the function or class is given, do the registration - if func: - return decorator(func) - - return decorator - - -@register("rule") -class RuleCheck(Check): - """Recursively checks credentials based on the defined rules.""" - - def __call__(self, target, creds, enforcer): - try: - return enforcer.rules[self.match](target, creds, enforcer) - except KeyError: - # We don't have any matching rule; fail closed - return False - - -@register("role") -class RoleCheck(Check): - """Check that there is a matching role in the ``creds`` dict.""" - - def __call__(self, target, creds, enforcer): - return self.match.lower() in [x.lower() for x in creds['roles']] - - -@register('http') -class HttpCheck(Check): - """Check ``http:`` rules by calling to a remote server. - - This example implementation simply verifies that the response - is exactly ``True``. - """ - - def __call__(self, target, creds, enforcer): - url = ('http:' + self.match) % target - - # Convert instances of object() in target temporarily to - # empty dict to avoid circular reference detection - # errors in jsonutils.dumps(). - temp_target = copy.deepcopy(target) - for key in target.keys(): - element = target.get(key) - if type(element) is object: - temp_target[key] = {} - - data = {'target': jsonutils.dumps(temp_target), - 'credentials': jsonutils.dumps(creds)} - post_data = urlparse.urlencode(data) - f = urlrequest.urlopen(url, post_data) - return f.read() == "True" - - -@register(None) -class GenericCheck(Check): - """Check an individual match. - - Matches look like: - - - tenant:%(tenant_id)s - - role:compute:admin - - True:%(user.enabled)s - - 'Member':%(role.name)s - """ - - def __call__(self, target, creds, enforcer): - try: - match = self.match % target - except KeyError: - # While doing GenericCheck if key not - # present in Target return false - return False - - try: - # Try to interpret self.kind as a literal - leftval = ast.literal_eval(self.kind) - except ValueError: - try: - kind_parts = self.kind.split('.') - leftval = creds - for kind_part in kind_parts: - leftval = leftval[kind_part] - except KeyError: - return False - return match == six.text_type(leftval) diff --git a/oslo_policy/tests/base.py b/oslo_policy/tests/base.py new file mode 100644 index 00000000..e166cb2f --- /dev/null +++ b/oslo_policy/tests/base.py @@ -0,0 +1,52 @@ +# Copyright (c) 2015 OpenStack Foundation. +# All Rights Reserved. + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import os.path + +from oslo_concurrency.fixture import lockutils +from oslo_config import cfg +from oslo_config import fixture as config +from oslotest import base as test_base + +from oslo_policy import policy + +TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), + '..', 'tests/var')) +ENFORCER = policy.Enforcer(cfg.CONF) + + +class PolicyBaseTestCase(test_base.BaseTestCase): + + def setUp(self): + super(PolicyBaseTestCase, self).setUp() + # NOTE(bnemec): Many of these tests use the same ENFORCER object, so + # I believe we need to serialize them. + self.useFixture(lockutils.LockFixture('policy-lock')) + self.CONF = self.useFixture(config.Config()).conf + self.CONF(args=['--config-dir', TEST_VAR_DIR]) + self.enforcer = ENFORCER + self.addCleanup(self.enforcer.clear) + + +class FakeCheck(policy.BaseCheck): + def __init__(self, result=None): + self.result = result + + def __str__(self): + return str(self.result) + + def __call__(self, target, creds, enforcer): + if self.result is not None: + return self.result + return (target, creds, enforcer) diff --git a/oslo_policy/tests/test_checks.py b/oslo_policy/tests/test_checks.py new file mode 100644 index 00000000..93818e45 --- /dev/null +++ b/oslo_policy/tests/test_checks.py @@ -0,0 +1,402 @@ +# Copyright (c) 2015 OpenStack Foundation. +# All Rights Reserved. + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import mock +from oslo_serialization import jsonutils +from oslotest import base as test_base +import six +import six.moves.urllib.parse as urlparse +import six.moves.urllib.request as urlrequest + +from oslo_policy import _checks +from oslo_policy import policy +from oslo_policy.tests import base + + +ENFORCER = base.ENFORCER + + +class CheckRegisterTestCase(test_base.BaseTestCase): + @mock.patch.object(_checks, 'registered_checks', {}) + def test_register_check(self): + class TestCheck(_checks.Check): + pass + + policy.register('spam', TestCheck) + + self.assertEqual(_checks.registered_checks, dict(spam=TestCheck)) + + @mock.patch.object(_checks, 'registered_checks', {}) + def test_register_check_decorator(self): + @policy.register('spam') + class TestCheck(_checks.Check): + pass + + self.assertEqual(_checks.registered_checks, dict(spam=TestCheck)) + + +class RuleCheckTestCase(test_base.BaseTestCase): + @mock.patch.object(ENFORCER, 'rules', {}) + def test_rule_missing(self): + check = _checks.RuleCheck('rule', 'spam') + + self.assertEqual(check('target', 'creds', ENFORCER), False) + + @mock.patch.object(ENFORCER, 'rules', + dict(spam=mock.Mock(return_value=False))) + def test_rule_false(self): + enforcer = ENFORCER + + check = _checks.RuleCheck('rule', 'spam') + + self.assertEqual(check('target', 'creds', enforcer), False) + enforcer.rules['spam'].assert_called_once_with('target', 'creds', + enforcer) + + @mock.patch.object(ENFORCER, 'rules', + dict(spam=mock.Mock(return_value=True))) + def test_rule_true(self): + enforcer = ENFORCER + check = _checks.RuleCheck('rule', 'spam') + + self.assertEqual(check('target', 'creds', enforcer), True) + enforcer.rules['spam'].assert_called_once_with('target', 'creds', + enforcer) + + +class RoleCheckTestCase(base.PolicyBaseTestCase): + def test_accept(self): + check = _checks.RoleCheck('role', 'sPaM') + + self.assertEqual(check('target', dict(roles=['SpAm']), + self.enforcer), True) + + def test_reject(self): + check = _checks.RoleCheck('role', 'spam') + + self.assertEqual(check('target', dict(roles=[]), self.enforcer), False) + + +class HttpCheckTestCase(base.PolicyBaseTestCase): + def decode_post_data(self, post_data): + result = {} + for item in post_data.split('&'): + key, _sep, value = item.partition('=') + result[key] = jsonutils.loads(urlparse.unquote_plus(value)) + + return result + + @mock.patch.object(urlrequest, 'urlopen', + return_value=six.StringIO('True')) + def test_accept(self, mock_urlopen): + check = _checks.HttpCheck('http', '//example.com/%(name)s') + self.assertEqual(check(dict(name='target', spam='spammer'), + dict(user='user', roles=['a', 'b', 'c']), + self.enforcer), + True) + self.assertEqual(mock_urlopen.call_count, 1) + + args = mock_urlopen.call_args[0] + + self.assertEqual(args[0], 'http://example.com/target') + self.assertEqual(self.decode_post_data(args[1]), dict( + target=dict(name='target', spam='spammer'), + credentials=dict(user='user', roles=['a', 'b', 'c']), + )) + + @mock.patch.object(urlrequest, 'urlopen', + return_value=six.StringIO('other')) + def test_reject(self, mock_urlopen): + check = _checks.HttpCheck('http', '//example.com/%(name)s') + + self.assertEqual(check(dict(name='target', spam='spammer'), + dict(user='user', roles=['a', 'b', 'c']), + self.enforcer), + False) + self.assertEqual(mock_urlopen.call_count, 1) + + args = mock_urlopen.call_args[0] + + self.assertEqual(args[0], 'http://example.com/target') + self.assertEqual(self.decode_post_data(args[1]), dict( + target=dict(name='target', spam='spammer'), + credentials=dict(user='user', roles=['a', 'b', 'c']), + )) + + @mock.patch.object(urlrequest, 'urlopen', + return_value=six.StringIO('True')) + def test_http_with_objects_in_target(self, mock_urlopen): + + check = _checks.HttpCheck('http', '//example.com/%(name)s') + target = {'a': object(), + 'name': 'target', + 'b': 'test data'} + self.assertEqual(check(target, + dict(user='user', roles=['a', 'b', 'c']), + self.enforcer), + True) + + @mock.patch.object(urlrequest, 'urlopen', + return_value=six.StringIO('True')) + def test_http_with_strings_in_target(self, mock_urlopen): + check = _checks.HttpCheck('http', '//example.com/%(name)s') + target = {'a': 'some_string', + 'name': 'target', + 'b': 'test data'} + self.assertEqual(check(target, + dict(user='user', roles=['a', 'b', 'c']), + self.enforcer), + True) + + +class GenericCheckTestCase(base.PolicyBaseTestCase): + def test_no_cred(self): + check = _checks.GenericCheck('name', '%(name)s') + + self.assertEqual(check(dict(name='spam'), {}, self.enforcer), False) + + def test_cred_mismatch(self): + check = _checks.GenericCheck('name', '%(name)s') + + self.assertEqual(check(dict(name='spam'), + dict(name='ham'), + self.enforcer), False) + + def test_accept(self): + check = _checks.GenericCheck('name', '%(name)s') + + self.assertEqual(check(dict(name='spam'), + dict(name='spam'), + self.enforcer), True) + + def test_no_key_match_in_target(self): + check = _checks.GenericCheck('name', '%(name)s') + + self.assertEqual(check(dict(name1='spam'), + dict(name='spam'), + self.enforcer), False) + + def test_constant_string_mismatch(self): + check = _checks.GenericCheck("'spam'", '%(name)s') + + self.assertEqual(check(dict(name='ham'), + {}, + self.enforcer), False) + + def test_constant_string_accept(self): + check = _checks.GenericCheck("'spam'", '%(name)s') + + self.assertEqual(check(dict(name='spam'), + {}, + self.enforcer), True) + + def test_constant_literal_mismatch(self): + check = _checks.GenericCheck("True", '%(enabled)s') + + self.assertEqual(check(dict(enabled=False), + {}, + self.enforcer), False) + + def test_constant_literal_accept(self): + check = _checks.GenericCheck("True", '%(enabled)s') + + self.assertEqual(check(dict(enabled=True), + {}, + self.enforcer), True) + + def test_deep_credentials_dictionary_lookup(self): + check = _checks.GenericCheck("a.b.c.d", 'APPLES') + + credentials = {'a': {'b': {'c': {'d': 'APPLES'}}}} + + self.assertEqual(check({}, + credentials, + self.enforcer), True) + + def test_missing_credentials_dictionary_lookup(self): + credentials = {'a': 'APPLES', 'o': {'t': 'ORANGES'}} + + # First a valid check - rest of case is expecting failures + # Should prove the basic credentials structure before we test + # for failure cases. + check = _checks.GenericCheck("o.t", 'ORANGES') + self.assertEqual(check({}, + credentials, + self.enforcer), True) + + # Case where final key is missing + check = _checks.GenericCheck("o.v", 'ORANGES') + self.assertEqual(check({}, + credentials, + self.enforcer), False) + + # Attempt to access key under a missing dictionary + check = _checks.GenericCheck("q.v", 'APPLES') + self.assertEqual(check({}, + credentials, + self.enforcer), False) + + +class FalseCheckTestCase(test_base.BaseTestCase): + def test_str(self): + check = _checks.FalseCheck() + + self.assertEqual(str(check), '!') + + def test_call(self): + check = _checks.FalseCheck() + + self.assertEqual(check('target', 'creds', None), False) + + +class TrueCheckTestCase(test_base.BaseTestCase): + def test_str(self): + check = _checks.TrueCheck() + + self.assertEqual(str(check), '@') + + def test_call(self): + check = _checks.TrueCheck() + + self.assertEqual(check('target', 'creds', None), True) + + +class CheckForTest(_checks.Check): + def __call__(self, target, creds, enforcer): + pass + + +class CheckTestCase(test_base.BaseTestCase): + def test_init(self): + check = CheckForTest('kind', 'match') + + self.assertEqual(check.kind, 'kind') + self.assertEqual(check.match, 'match') + + def test_str(self): + check = CheckForTest('kind', 'match') + + self.assertEqual(str(check), 'kind:match') + + +class NotCheckTestCase(test_base.BaseTestCase): + def test_init(self): + check = _checks.NotCheck('rule') + + self.assertEqual(check.rule, 'rule') + + def test_str(self): + check = _checks.NotCheck('rule') + + self.assertEqual(str(check), 'not rule') + + def test_call_true(self): + rule = mock.Mock(return_value=True) + check = _checks.NotCheck(rule) + + self.assertEqual(check('target', 'cred', None), False) + rule.assert_called_once_with('target', 'cred', None) + + def test_call_false(self): + rule = mock.Mock(return_value=False) + check = _checks.NotCheck(rule) + + self.assertEqual(check('target', 'cred', None), True) + rule.assert_called_once_with('target', 'cred', None) + + +class AndCheckTestCase(test_base.BaseTestCase): + def test_init(self): + check = _checks.AndCheck(['rule1', 'rule2']) + + self.assertEqual(check.rules, ['rule1', 'rule2']) + + def test_add_check(self): + check = _checks.AndCheck(['rule1', 'rule2']) + check.add_check('rule3') + + self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3']) + + def test_str(self): + check = _checks.AndCheck(['rule1', 'rule2']) + + self.assertEqual(str(check), '(rule1 and rule2)') + + def test_call_all_false(self): + rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)] + check = _checks.AndCheck(rules) + + self.assertEqual(check('target', 'cred', None), False) + rules[0].assert_called_once_with('target', 'cred', None) + self.assertFalse(rules[1].called) + + def test_call_first_true(self): + rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)] + check = _checks.AndCheck(rules) + + self.assertFalse(check('target', 'cred', None)) + rules[0].assert_called_once_with('target', 'cred', None) + rules[1].assert_called_once_with('target', 'cred', None) + + def test_call_second_true(self): + rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)] + check = _checks.AndCheck(rules) + + self.assertFalse(check('target', 'cred', None)) + rules[0].assert_called_once_with('target', 'cred', None) + self.assertFalse(rules[1].called) + + +class OrCheckTestCase(test_base.BaseTestCase): + def test_init(self): + check = _checks.OrCheck(['rule1', 'rule2']) + + self.assertEqual(check.rules, ['rule1', 'rule2']) + + def test_add_check(self): + check = _checks.OrCheck(['rule1', 'rule2']) + check.add_check('rule3') + + self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3']) + + def test_str(self): + check = _checks.OrCheck(['rule1', 'rule2']) + + self.assertEqual(str(check), '(rule1 or rule2)') + + def test_call_all_false(self): + rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)] + check = _checks.OrCheck(rules) + + self.assertEqual(check('target', 'cred', None), False) + rules[0].assert_called_once_with('target', 'cred', None) + rules[1].assert_called_once_with('target', 'cred', None) + + def test_call_first_true(self): + rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)] + check = _checks.OrCheck(rules) + + self.assertEqual(check('target', 'cred', None), True) + rules[0].assert_called_once_with('target', 'cred', None) + self.assertFalse(rules[1].called) + + def test_call_second_true(self): + rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)] + check = _checks.OrCheck(rules) + + self.assertEqual(check('target', 'cred', None), True) + rules[0].assert_called_once_with('target', 'cred', None) + rules[1].assert_called_once_with('target', 'cred', None) diff --git a/oslo_policy/tests/test_parser.py b/oslo_policy/tests/test_parser.py new file mode 100644 index 00000000..bae3cb71 --- /dev/null +++ b/oslo_policy/tests/test_parser.py @@ -0,0 +1,403 @@ +# Copyright (c) 2015 OpenStack Foundation. +# All Rights Reserved. + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslotest import base as test_base +import six + +from oslo_policy import _checks +from oslo_policy import _parser +from oslo_policy.tests import base + + +class ParseCheckTestCase(test_base.BaseTestCase): + def test_false(self): + result = _parser._parse_check('!') + + self.assertTrue(isinstance(result, _checks.FalseCheck)) + + def test_true(self): + result = _parser._parse_check('@') + + self.assertTrue(isinstance(result, _checks.TrueCheck)) + + def test_bad_rule(self): + result = _parser._parse_check('foobar') + + self.assertTrue(isinstance(result, _checks.FalseCheck)) + + @mock.patch.object(_checks, 'registered_checks', {}) + def test_no_handler(self): + result = _parser._parse_check('no:handler') + + self.assertTrue(isinstance(result, _checks.FalseCheck)) + + @mock.patch.object(_checks, 'registered_checks', { + 'spam': mock.Mock(return_value="spam_check"), + None: mock.Mock(return_value="none_check"), + }) + def test_check(self): + result = _parser._parse_check('spam:handler') + + self.assertEqual(result, 'spam_check') + _checks.registered_checks['spam'].assert_called_once_with('spam', + 'handler') + self.assertFalse(_checks.registered_checks[None].called) + + @mock.patch.object(_checks, 'registered_checks', { + None: mock.Mock(return_value="none_check"), + }) + def test_check_default(self): + result = _parser._parse_check('spam:handler') + + self.assertEqual(result, 'none_check') + _checks.registered_checks[None].assert_called_once_with('spam', + 'handler') + + +class ParseListRuleTestCase(test_base.BaseTestCase): + def test_empty(self): + result = _parser._parse_list_rule([]) + + self.assertTrue(isinstance(result, _checks.TrueCheck)) + self.assertEqual(str(result), '@') + + @mock.patch.object(_parser, '_parse_check', base.FakeCheck) + def test_oneele_zeroele(self): + result = _parser._parse_list_rule([[]]) + + self.assertTrue(isinstance(result, _checks.FalseCheck)) + self.assertEqual(str(result), '!') + + @mock.patch.object(_parser, '_parse_check', base.FakeCheck) + def test_oneele_bare(self): + result = _parser._parse_list_rule(['rule']) + + self.assertTrue(isinstance(result, base.FakeCheck)) + self.assertEqual(result.result, 'rule') + self.assertEqual(str(result), 'rule') + + @mock.patch.object(_parser, '_parse_check', base.FakeCheck) + def test_oneele_oneele(self): + result = _parser._parse_list_rule([['rule']]) + + self.assertTrue(isinstance(result, base.FakeCheck)) + self.assertEqual(result.result, 'rule') + self.assertEqual(str(result), 'rule') + + @mock.patch.object(_parser, '_parse_check', base.FakeCheck) + def test_oneele_multi(self): + result = _parser._parse_list_rule([['rule1', 'rule2']]) + + self.assertTrue(isinstance(result, _checks.AndCheck)) + self.assertEqual(len(result.rules), 2) + for i, value in enumerate(['rule1', 'rule2']): + self.assertTrue(isinstance(result.rules[i], base.FakeCheck)) + self.assertEqual(result.rules[i].result, value) + self.assertEqual(str(result), '(rule1 and rule2)') + + @mock.patch.object(_parser, '_parse_check', base.FakeCheck) + def test_multi_oneele(self): + result = _parser._parse_list_rule([['rule1'], ['rule2']]) + + self.assertTrue(isinstance(result, _checks.OrCheck)) + self.assertEqual(len(result.rules), 2) + for i, value in enumerate(['rule1', 'rule2']): + self.assertTrue(isinstance(result.rules[i], base.FakeCheck)) + self.assertEqual(result.rules[i].result, value) + self.assertEqual(str(result), '(rule1 or rule2)') + + @mock.patch.object(_parser, '_parse_check', base.FakeCheck) + def test_multi_multi(self): + result = _parser._parse_list_rule([['rule1', 'rule2'], + ['rule3', 'rule4']]) + + self.assertTrue(isinstance(result, _checks.OrCheck)) + self.assertEqual(len(result.rules), 2) + for i, values in enumerate([['rule1', 'rule2'], ['rule3', 'rule4']]): + self.assertTrue(isinstance(result.rules[i], _checks.AndCheck)) + self.assertEqual(len(result.rules[i].rules), 2) + for j, value in enumerate(values): + self.assertTrue(isinstance(result.rules[i].rules[j], + base.FakeCheck)) + self.assertEqual(result.rules[i].rules[j].result, value) + self.assertEqual(str(result), + '((rule1 and rule2) or (rule3 and rule4))') + + +class ParseTokenizeTestCase(test_base.BaseTestCase): + @mock.patch.object(_parser, '_parse_check', lambda x: x) + def test_tokenize(self): + exemplar = ("(( ( ((() And)) or ) (check:%(miss)s) not)) " + "'a-string' \"another-string\"") + expected = [ + ('(', '('), ('(', '('), ('(', '('), ('(', '('), ('(', '('), + ('(', '('), (')', ')'), ('and', 'And'), + (')', ')'), (')', ')'), ('or', 'or'), (')', ')'), ('(', '('), + ('check', 'check:%(miss)s'), (')', ')'), ('not', 'not'), + (')', ')'), (')', ')'), + ('string', 'a-string'), + ('string', 'another-string'), + ] + + result = list(_parser._parse_tokenize(exemplar)) + + self.assertEqual(result, expected) + + +class ParseStateMetaTestCase(test_base.BaseTestCase): + def test_reducer(self): + @_parser.reducer('a', 'b', 'c') + @_parser.reducer('d', 'e', 'f') + def spam(): + pass + + self.assertTrue(hasattr(spam, 'reducers')) + self.assertEqual(spam.reducers, [['d', 'e', 'f'], ['a', 'b', 'c']]) + + def test_parse_state_meta(self): + @six.add_metaclass(_parser.ParseStateMeta) + class FakeState(object): + + @_parser.reducer('a', 'b', 'c') + @_parser.reducer('d', 'e', 'f') + def reduce1(self): + pass + + @_parser.reducer('g', 'h', 'i') + def reduce2(self): + pass + + self.assertTrue(hasattr(FakeState, 'reducers')) + for reduction, reducer in FakeState.reducers: + if (reduction == ['a', 'b', 'c'] or + reduction == ['d', 'e', 'f']): + self.assertEqual(reducer, 'reduce1') + elif reduction == ['g', 'h', 'i']: + self.assertEqual(reducer, 'reduce2') + else: + self.fail("Unrecognized reducer discovered") + + +class ParseStateTestCase(test_base.BaseTestCase): + def test_init(self): + state = _parser.ParseState() + + self.assertEqual(state.tokens, []) + self.assertEqual(state.values, []) + + @mock.patch.object(_parser.ParseState, 'reducers', [(['tok1'], 'meth')]) + @mock.patch.object(_parser.ParseState, 'meth', create=True) + def test_reduce_none(self, mock_meth): + state = _parser.ParseState() + state.tokens = ['tok2'] + state.values = ['val2'] + + state.reduce() + + self.assertEqual(state.tokens, ['tok2']) + self.assertEqual(state.values, ['val2']) + self.assertFalse(mock_meth.called) + + @mock.patch.object(_parser.ParseState, 'reducers', + [(['tok1', 'tok2'], 'meth')]) + @mock.patch.object(_parser.ParseState, 'meth', create=True) + def test_reduce_short(self, mock_meth): + state = _parser.ParseState() + state.tokens = ['tok1'] + state.values = ['val1'] + + state.reduce() + + self.assertEqual(state.tokens, ['tok1']) + self.assertEqual(state.values, ['val1']) + self.assertFalse(mock_meth.called) + + @mock.patch.object(_parser.ParseState, 'reducers', + [(['tok1', 'tok2'], 'meth')]) + @mock.patch.object(_parser.ParseState, 'meth', create=True, + return_value=[('tok3', 'val3')]) + def test_reduce_one(self, mock_meth): + state = _parser.ParseState() + state.tokens = ['tok1', 'tok2'] + state.values = ['val1', 'val2'] + + state.reduce() + + self.assertEqual(state.tokens, ['tok3']) + self.assertEqual(state.values, ['val3']) + mock_meth.assert_called_once_with('val1', 'val2') + + @mock.patch.object(_parser.ParseState, 'reducers', [ + (['tok1', 'tok4'], 'meth2'), + (['tok2', 'tok3'], 'meth1'), + ]) + @mock.patch.object(_parser.ParseState, 'meth1', create=True, + return_value=[('tok4', 'val4')]) + @mock.patch.object(_parser.ParseState, 'meth2', create=True, + return_value=[('tok5', 'val5')]) + def test_reduce_two(self, mock_meth2, mock_meth1): + state = _parser.ParseState() + state.tokens = ['tok1', 'tok2', 'tok3'] + state.values = ['val1', 'val2', 'val3'] + + state.reduce() + + self.assertEqual(state.tokens, ['tok5']) + self.assertEqual(state.values, ['val5']) + mock_meth1.assert_called_once_with('val2', 'val3') + mock_meth2.assert_called_once_with('val1', 'val4') + + @mock.patch.object(_parser.ParseState, 'reducers', + [(['tok1', 'tok2'], 'meth')]) + @mock.patch.object(_parser.ParseState, 'meth', create=True, + return_value=[('tok3', 'val3'), ('tok4', 'val4')]) + def test_reduce_multi(self, mock_meth): + state = _parser.ParseState() + state.tokens = ['tok1', 'tok2'] + state.values = ['val1', 'val2'] + + state.reduce() + + self.assertEqual(state.tokens, ['tok3', 'tok4']) + self.assertEqual(state.values, ['val3', 'val4']) + mock_meth.assert_called_once_with('val1', 'val2') + + def test_shift(self): + state = _parser.ParseState() + + with mock.patch.object(_parser.ParseState, 'reduce') as mock_reduce: + state.shift('token', 'value') + + self.assertEqual(state.tokens, ['token']) + self.assertEqual(state.values, ['value']) + mock_reduce.assert_called_once_with() + + def test_result_empty(self): + state = _parser.ParseState() + + self.assertRaises(ValueError, lambda: state.result) + + def test_result_unreduced(self): + state = _parser.ParseState() + state.tokens = ['tok1', 'tok2'] + state.values = ['val1', 'val2'] + + self.assertRaises(ValueError, lambda: state.result) + + def test_result(self): + state = _parser.ParseState() + state.tokens = ['token'] + state.values = ['value'] + + self.assertEqual(state.result, 'value') + + def test_wrap_check(self): + state = _parser.ParseState() + + result = state._wrap_check('(', 'the_check', ')') + + self.assertEqual(result, [('check', 'the_check')]) + + @mock.patch.object(_checks, 'AndCheck', lambda x: x) + def test_make_and_expr(self): + state = _parser.ParseState() + + result = state._make_and_expr('check1', 'and', 'check2') + + self.assertEqual(result, [('and_expr', ['check1', 'check2'])]) + + def test_extend_and_expr(self): + state = _parser.ParseState() + mock_expr = mock.Mock() + mock_expr.add_check.return_value = 'newcheck' + + result = state._extend_and_expr(mock_expr, 'and', 'check') + + self.assertEqual(result, [('and_expr', 'newcheck')]) + mock_expr.add_check.assert_called_once_with('check') + + @mock.patch.object(_checks, 'OrCheck', lambda x: x) + def test_make_or_expr(self): + state = _parser.ParseState() + + result = state._make_or_expr('check1', 'or', 'check2') + + self.assertEqual(result, [('or_expr', ['check1', 'check2'])]) + + def test_extend_or_expr(self): + state = _parser.ParseState() + mock_expr = mock.Mock() + mock_expr.add_check.return_value = 'newcheck' + + result = state._extend_or_expr(mock_expr, 'or', 'check') + + self.assertEqual(result, [('or_expr', 'newcheck')]) + mock_expr.add_check.assert_called_once_with('check') + + @mock.patch.object(_checks, 'NotCheck', lambda x: 'not %s' % x) + def test_make_not_expr(self): + state = _parser.ParseState() + + result = state._make_not_expr('not', 'check') + + self.assertEqual(result, [('check', 'not check')]) + + +class ParseTextRuleTestCase(test_base.BaseTestCase): + def test_empty(self): + result = _parser._parse_text_rule('') + + self.assertTrue(isinstance(result, _checks.TrueCheck)) + + @mock.patch.object(_parser, '_parse_tokenize', + return_value=[('tok1', 'val1'), ('tok2', 'val2')]) + @mock.patch.object(_parser.ParseState, 'shift') + @mock.patch.object(_parser.ParseState, 'result', 'result') + def test_shifts(self, mock_shift, mock_parse_tokenize): + result = _parser._parse_text_rule('test rule') + + self.assertEqual(result, 'result') + mock_parse_tokenize.assert_called_once_with('test rule') + mock_shift.assert_has_calls( + [mock.call('tok1', 'val1'), mock.call('tok2', 'val2')]) + + @mock.patch.object(_parser, '_parse_tokenize', return_value=[]) + def test_fail(self, mock_parse_tokenize): + result = _parser._parse_text_rule('test rule') + + self.assertTrue(isinstance(result, _checks.FalseCheck)) + mock_parse_tokenize.assert_called_once_with('test rule') + + +class ParseRuleTestCase(test_base.BaseTestCase): + @mock.patch.object(_parser, '_parse_text_rule', return_value='text rule') + @mock.patch.object(_parser, '_parse_list_rule', return_value='list rule') + def test_parse_rule_string(self, mock_parse_list_rule, + mock_parse_text_rule): + result = _parser.parse_rule("a string") + + self.assertEqual(result, 'text rule') + self.assertFalse(mock_parse_list_rule.called) + mock_parse_text_rule.assert_called_once_with('a string') + + @mock.patch.object(_parser, '_parse_text_rule', return_value='text rule') + @mock.patch.object(_parser, '_parse_list_rule', return_value='list rule') + def test_parse_rule_list(self, mock_parse_list_rule, mock_parse_text_rule): + result = _parser.parse_rule([['a'], ['list']]) + + self.assertEqual(result, 'list rule') + self.assertFalse(mock_parse_text_rule.called) + mock_parse_list_rule.assert_called_once_with([['a'], ['list']]) diff --git a/oslo_policy/tests/test_policy.py b/oslo_policy/tests/test_policy.py index 145a9574..aed785cd 100644 --- a/oslo_policy/tests/test_policy.py +++ b/oslo_policy/tests/test_policy.py @@ -15,26 +15,16 @@ """Test of Policy Engine""" -import os - import mock -from oslo_concurrency.fixture import lockutils from oslo_config import cfg -from oslo_config import fixture as config from oslo_serialization import jsonutils from oslotest import base as test_base -import six -import six.moves.urllib.parse as urlparse -import six.moves.urllib.request as urlrequest +from oslo_policy import _checks +from oslo_policy import _parser from oslo_policy.openstack.common import fileutils from oslo_policy import policy - - -TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), - '..', 'tests/var')) - -ENFORCER = policy.Enforcer(cfg.CONF) +from oslo_policy.tests import base class MyException(Exception): @@ -79,7 +69,7 @@ class RulesTestCase(test_base.BaseTestCase): self.assertEqual(rules['b'], 2) self.assertEqual(rules['c'], 3) - @mock.patch.object(policy, '_parse_rule', lambda x: x) + @mock.patch.object(_parser, 'parse_rule', lambda x: x) def test_load_json(self): exemplar = """{ "admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]], @@ -108,26 +98,13 @@ class RulesTestCase(test_base.BaseTestCase): "admin_or_owner": "" }""" rules = policy.Rules(dict( - admin_or_owner=policy.TrueCheck(), + admin_or_owner=_checks.TrueCheck(), )) self.assertEqual(str(rules), exemplar) -class PolicyBaseTestCase(test_base.BaseTestCase): - - def setUp(self): - super(PolicyBaseTestCase, self).setUp() - # NOTE(bnemec): Many of these tests use the same ENFORCER object, so - # I believe we need to serialize them. - self.useFixture(lockutils.LockFixture('policy-lock')) - self.CONF = self.useFixture(config.Config()).conf - self.CONF(args=['--config-dir', TEST_VAR_DIR]) - self.enforcer = ENFORCER - self.addCleanup(self.enforcer.clear) - - -class EnforcerTest(PolicyBaseTestCase): +class EnforcerTest(base.PolicyBaseTestCase): def test_load_file(self): self.CONF.set_override('policy_dirs', [], group='oslo_policy') @@ -195,7 +172,7 @@ class EnforcerTest(PolicyBaseTestCase): "cloudwatch:PutMetricData": "" }""" rules = policy.Rules.load_json(rules_json) - default_rule = policy.TrueCheck() + default_rule = _checks.TrueCheck() enforcer = policy.Enforcer(cfg.CONF, default_rule=default_rule) enforcer.set_rules(rules) action = "cloudwatch:PutMetricData" @@ -204,9 +181,9 @@ class EnforcerTest(PolicyBaseTestCase): def test_enforcer_force_reload_with_overwrite(self): # Prepare in memory fake policies. - self.enforcer.set_rules({'test': policy._parse_rule('role:test')}, + self.enforcer.set_rules({'test': _parser.parse_rule('role:test')}, use_conf=True) - self.enforcer.set_rules({'default': policy._parse_rule('role:fakeZ')}, + self.enforcer.set_rules({'default': _parser.parse_rule('role:fakeZ')}, overwrite=False, # Keeps 'test' role. use_conf=True) @@ -232,9 +209,9 @@ class EnforcerTest(PolicyBaseTestCase): def test_enforcer_force_reload_without_overwrite(self): # Prepare in memory fake policies. - self.enforcer.set_rules({'test': policy._parse_rule('role:test')}, + self.enforcer.set_rules({'test': _parser.parse_rule('role:test')}, use_conf=True) - self.enforcer.set_rules({'default': policy._parse_rule('role:fakeZ')}, + self.enforcer.set_rules({'default': _parser.parse_rule('role:fakeZ')}, overwrite=False, # Keeps 'test' role. use_conf=True) @@ -346,23 +323,10 @@ class EnforcerTest(PolicyBaseTestCase): self.assertEqual('bar_rule', enforcer.rules.default_rule) -class FakeCheck(policy.BaseCheck): - def __init__(self, result=None): - self.result = result - - def __str__(self): - return str(self.result) - - def __call__(self, target, creds, enforcer): - if self.result is not None: - return self.result - return (target, creds, enforcer) - - -class CheckFunctionTestCase(PolicyBaseTestCase): +class CheckFunctionTestCase(base.PolicyBaseTestCase): def test_check_explicit(self): - rule = FakeCheck() + rule = base.FakeCheck() result = self.enforcer.enforce(rule, "target", "creds") self.assertEqual(result, ("target", "creds", self.enforcer)) @@ -376,13 +340,13 @@ class CheckFunctionTestCase(PolicyBaseTestCase): self.assertEqual(result, False) def test_check_with_rule(self): - self.enforcer.set_rules(dict(default=FakeCheck())) + self.enforcer.set_rules(dict(default=base.FakeCheck())) result = self.enforcer.enforce("default", "target", "creds") self.assertEqual(result, ("target", "creds", self.enforcer)) def test_check_raises(self): - self.enforcer.set_rules(dict(default=policy.FalseCheck())) + self.enforcer.set_rules(dict(default=_checks.FalseCheck())) try: self.enforcer.enforce('rule', 'target', 'creds', @@ -393,755 +357,3 @@ class CheckFunctionTestCase(PolicyBaseTestCase): self.assertEqual(exc.kwargs, dict(kw1="kwarg1", kw2="kwarg2")) else: self.fail("enforcer.enforce() failed to raise requested exception") - - -class FalseCheckTestCase(test_base.BaseTestCase): - def test_str(self): - check = policy.FalseCheck() - - self.assertEqual(str(check), '!') - - def test_call(self): - check = policy.FalseCheck() - - self.assertEqual(check('target', 'creds', None), False) - - -class TrueCheckTestCase(test_base.BaseTestCase): - def test_str(self): - check = policy.TrueCheck() - - self.assertEqual(str(check), '@') - - def test_call(self): - check = policy.TrueCheck() - - self.assertEqual(check('target', 'creds', None), True) - - -class CheckForTest(policy.Check): - def __call__(self, target, creds, enforcer): - pass - - -class CheckTestCase(test_base.BaseTestCase): - def test_init(self): - check = CheckForTest('kind', 'match') - - self.assertEqual(check.kind, 'kind') - self.assertEqual(check.match, 'match') - - def test_str(self): - check = CheckForTest('kind', 'match') - - self.assertEqual(str(check), 'kind:match') - - -class NotCheckTestCase(test_base.BaseTestCase): - def test_init(self): - check = policy.NotCheck('rule') - - self.assertEqual(check.rule, 'rule') - - def test_str(self): - check = policy.NotCheck('rule') - - self.assertEqual(str(check), 'not rule') - - def test_call_true(self): - rule = mock.Mock(return_value=True) - check = policy.NotCheck(rule) - - self.assertEqual(check('target', 'cred', None), False) - rule.assert_called_once_with('target', 'cred', None) - - def test_call_false(self): - rule = mock.Mock(return_value=False) - check = policy.NotCheck(rule) - - self.assertEqual(check('target', 'cred', None), True) - rule.assert_called_once_with('target', 'cred', None) - - -class AndCheckTestCase(test_base.BaseTestCase): - def test_init(self): - check = policy.AndCheck(['rule1', 'rule2']) - - self.assertEqual(check.rules, ['rule1', 'rule2']) - - def test_add_check(self): - check = policy.AndCheck(['rule1', 'rule2']) - check.add_check('rule3') - - self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3']) - - def test_str(self): - check = policy.AndCheck(['rule1', 'rule2']) - - self.assertEqual(str(check), '(rule1 and rule2)') - - def test_call_all_false(self): - rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)] - check = policy.AndCheck(rules) - - self.assertEqual(check('target', 'cred', None), False) - rules[0].assert_called_once_with('target', 'cred', None) - self.assertFalse(rules[1].called) - - def test_call_first_true(self): - rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)] - check = policy.AndCheck(rules) - - self.assertFalse(check('target', 'cred', None)) - rules[0].assert_called_once_with('target', 'cred', None) - rules[1].assert_called_once_with('target', 'cred', None) - - def test_call_second_true(self): - rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)] - check = policy.AndCheck(rules) - - self.assertFalse(check('target', 'cred', None)) - rules[0].assert_called_once_with('target', 'cred', None) - self.assertFalse(rules[1].called) - - -class OrCheckTestCase(test_base.BaseTestCase): - def test_init(self): - check = policy.OrCheck(['rule1', 'rule2']) - - self.assertEqual(check.rules, ['rule1', 'rule2']) - - def test_add_check(self): - check = policy.OrCheck(['rule1', 'rule2']) - check.add_check('rule3') - - self.assertEqual(check.rules, ['rule1', 'rule2', 'rule3']) - - def test_str(self): - check = policy.OrCheck(['rule1', 'rule2']) - - self.assertEqual(str(check), '(rule1 or rule2)') - - def test_call_all_false(self): - rules = [mock.Mock(return_value=False), mock.Mock(return_value=False)] - check = policy.OrCheck(rules) - - self.assertEqual(check('target', 'cred', None), False) - rules[0].assert_called_once_with('target', 'cred', None) - rules[1].assert_called_once_with('target', 'cred', None) - - def test_call_first_true(self): - rules = [mock.Mock(return_value=True), mock.Mock(return_value=False)] - check = policy.OrCheck(rules) - - self.assertEqual(check('target', 'cred', None), True) - rules[0].assert_called_once_with('target', 'cred', None) - self.assertFalse(rules[1].called) - - def test_call_second_true(self): - rules = [mock.Mock(return_value=False), mock.Mock(return_value=True)] - check = policy.OrCheck(rules) - - self.assertEqual(check('target', 'cred', None), True) - rules[0].assert_called_once_with('target', 'cred', None) - rules[1].assert_called_once_with('target', 'cred', None) - - -class ParseCheckTestCase(test_base.BaseTestCase): - def test_false(self): - result = policy._parse_check('!') - - self.assertTrue(isinstance(result, policy.FalseCheck)) - - def test_true(self): - result = policy._parse_check('@') - - self.assertTrue(isinstance(result, policy.TrueCheck)) - - def test_bad_rule(self): - result = policy._parse_check('foobar') - - self.assertTrue(isinstance(result, policy.FalseCheck)) - - @mock.patch.object(policy, '_checks', {}) - def test_no_handler(self): - result = policy._parse_check('no:handler') - - self.assertTrue(isinstance(result, policy.FalseCheck)) - - @mock.patch.object(policy, '_checks', { - 'spam': mock.Mock(return_value="spam_check"), - None: mock.Mock(return_value="none_check"), - }) - def test_check(self): - result = policy._parse_check('spam:handler') - - self.assertEqual(result, 'spam_check') - policy._checks['spam'].assert_called_once_with('spam', 'handler') - self.assertFalse(policy._checks[None].called) - - @mock.patch.object(policy, '_checks', { - None: mock.Mock(return_value="none_check"), - }) - def test_check_default(self): - result = policy._parse_check('spam:handler') - - self.assertEqual(result, 'none_check') - policy._checks[None].assert_called_once_with('spam', 'handler') - - -class ParseListRuleTestCase(test_base.BaseTestCase): - def test_empty(self): - result = policy._parse_list_rule([]) - - self.assertTrue(isinstance(result, policy.TrueCheck)) - self.assertEqual(str(result), '@') - - @mock.patch.object(policy, '_parse_check', FakeCheck) - def test_oneele_zeroele(self): - result = policy._parse_list_rule([[]]) - - self.assertTrue(isinstance(result, policy.FalseCheck)) - self.assertEqual(str(result), '!') - - @mock.patch.object(policy, '_parse_check', FakeCheck) - def test_oneele_bare(self): - result = policy._parse_list_rule(['rule']) - - self.assertTrue(isinstance(result, FakeCheck)) - self.assertEqual(result.result, 'rule') - self.assertEqual(str(result), 'rule') - - @mock.patch.object(policy, '_parse_check', FakeCheck) - def test_oneele_oneele(self): - result = policy._parse_list_rule([['rule']]) - - self.assertTrue(isinstance(result, FakeCheck)) - self.assertEqual(result.result, 'rule') - self.assertEqual(str(result), 'rule') - - @mock.patch.object(policy, '_parse_check', FakeCheck) - def test_oneele_multi(self): - result = policy._parse_list_rule([['rule1', 'rule2']]) - - self.assertTrue(isinstance(result, policy.AndCheck)) - self.assertEqual(len(result.rules), 2) - for i, value in enumerate(['rule1', 'rule2']): - self.assertTrue(isinstance(result.rules[i], FakeCheck)) - self.assertEqual(result.rules[i].result, value) - self.assertEqual(str(result), '(rule1 and rule2)') - - @mock.patch.object(policy, '_parse_check', FakeCheck) - def test_multi_oneele(self): - result = policy._parse_list_rule([['rule1'], ['rule2']]) - - self.assertTrue(isinstance(result, policy.OrCheck)) - self.assertEqual(len(result.rules), 2) - for i, value in enumerate(['rule1', 'rule2']): - self.assertTrue(isinstance(result.rules[i], FakeCheck)) - self.assertEqual(result.rules[i].result, value) - self.assertEqual(str(result), '(rule1 or rule2)') - - @mock.patch.object(policy, '_parse_check', FakeCheck) - def test_multi_multi(self): - result = policy._parse_list_rule([['rule1', 'rule2'], - ['rule3', 'rule4']]) - - self.assertTrue(isinstance(result, policy.OrCheck)) - self.assertEqual(len(result.rules), 2) - for i, values in enumerate([['rule1', 'rule2'], ['rule3', 'rule4']]): - self.assertTrue(isinstance(result.rules[i], policy.AndCheck)) - self.assertEqual(len(result.rules[i].rules), 2) - for j, value in enumerate(values): - self.assertTrue(isinstance(result.rules[i].rules[j], - FakeCheck)) - self.assertEqual(result.rules[i].rules[j].result, value) - self.assertEqual(str(result), - '((rule1 and rule2) or (rule3 and rule4))') - - -class ParseTokenizeTestCase(test_base.BaseTestCase): - @mock.patch.object(policy, '_parse_check', lambda x: x) - def test_tokenize(self): - exemplar = ("(( ( ((() And)) or ) (check:%(miss)s) not)) " - "'a-string' \"another-string\"") - expected = [ - ('(', '('), ('(', '('), ('(', '('), ('(', '('), ('(', '('), - ('(', '('), (')', ')'), ('and', 'And'), - (')', ')'), (')', ')'), ('or', 'or'), (')', ')'), ('(', '('), - ('check', 'check:%(miss)s'), (')', ')'), ('not', 'not'), - (')', ')'), (')', ')'), - ('string', 'a-string'), - ('string', 'another-string'), - ] - - result = list(policy._parse_tokenize(exemplar)) - - self.assertEqual(result, expected) - - -class ParseStateMetaTestCase(test_base.BaseTestCase): - def test_reducer(self): - @policy.reducer('a', 'b', 'c') - @policy.reducer('d', 'e', 'f') - def spam(): - pass - - self.assertTrue(hasattr(spam, 'reducers')) - self.assertEqual(spam.reducers, [['d', 'e', 'f'], ['a', 'b', 'c']]) - - def test_parse_state_meta(self): - @six.add_metaclass(policy._ParseStateMeta) - class FakeState(object): - - @policy.reducer('a', 'b', 'c') - @policy.reducer('d', 'e', 'f') - def reduce1(self): - pass - - @policy.reducer('g', 'h', 'i') - def reduce2(self): - pass - - self.assertTrue(hasattr(FakeState, 'reducers')) - for reduction, reducer in FakeState.reducers: - if (reduction == ['a', 'b', 'c'] or - reduction == ['d', 'e', 'f']): - self.assertEqual(reducer, 'reduce1') - elif reduction == ['g', 'h', 'i']: - self.assertEqual(reducer, 'reduce2') - else: - self.fail("Unrecognized reducer discovered") - - -class ParseStateTestCase(test_base.BaseTestCase): - def test_init(self): - state = policy._ParseState() - - self.assertEqual(state.tokens, []) - self.assertEqual(state.values, []) - - @mock.patch.object(policy._ParseState, 'reducers', [(['tok1'], 'meth')]) - @mock.patch.object(policy._ParseState, 'meth', create=True) - def test_reduce_none(self, mock_meth): - state = policy._ParseState() - state.tokens = ['tok2'] - state.values = ['val2'] - - state.reduce() - - self.assertEqual(state.tokens, ['tok2']) - self.assertEqual(state.values, ['val2']) - self.assertFalse(mock_meth.called) - - @mock.patch.object(policy._ParseState, 'reducers', - [(['tok1', 'tok2'], 'meth')]) - @mock.patch.object(policy._ParseState, 'meth', create=True) - def test_reduce_short(self, mock_meth): - state = policy._ParseState() - state.tokens = ['tok1'] - state.values = ['val1'] - - state.reduce() - - self.assertEqual(state.tokens, ['tok1']) - self.assertEqual(state.values, ['val1']) - self.assertFalse(mock_meth.called) - - @mock.patch.object(policy._ParseState, 'reducers', - [(['tok1', 'tok2'], 'meth')]) - @mock.patch.object(policy._ParseState, 'meth', create=True, - return_value=[('tok3', 'val3')]) - def test_reduce_one(self, mock_meth): - state = policy._ParseState() - state.tokens = ['tok1', 'tok2'] - state.values = ['val1', 'val2'] - - state.reduce() - - self.assertEqual(state.tokens, ['tok3']) - self.assertEqual(state.values, ['val3']) - mock_meth.assert_called_once_with('val1', 'val2') - - @mock.patch.object(policy._ParseState, 'reducers', [ - (['tok1', 'tok4'], 'meth2'), - (['tok2', 'tok3'], 'meth1'), - ]) - @mock.patch.object(policy._ParseState, 'meth1', create=True, - return_value=[('tok4', 'val4')]) - @mock.patch.object(policy._ParseState, 'meth2', create=True, - return_value=[('tok5', 'val5')]) - def test_reduce_two(self, mock_meth2, mock_meth1): - state = policy._ParseState() - state.tokens = ['tok1', 'tok2', 'tok3'] - state.values = ['val1', 'val2', 'val3'] - - state.reduce() - - self.assertEqual(state.tokens, ['tok5']) - self.assertEqual(state.values, ['val5']) - mock_meth1.assert_called_once_with('val2', 'val3') - mock_meth2.assert_called_once_with('val1', 'val4') - - @mock.patch.object(policy._ParseState, 'reducers', - [(['tok1', 'tok2'], 'meth')]) - @mock.patch.object(policy._ParseState, 'meth', create=True, - return_value=[('tok3', 'val3'), ('tok4', 'val4')]) - def test_reduce_multi(self, mock_meth): - state = policy._ParseState() - state.tokens = ['tok1', 'tok2'] - state.values = ['val1', 'val2'] - - state.reduce() - - self.assertEqual(state.tokens, ['tok3', 'tok4']) - self.assertEqual(state.values, ['val3', 'val4']) - mock_meth.assert_called_once_with('val1', 'val2') - - def test_shift(self): - state = policy._ParseState() - - with mock.patch.object(policy._ParseState, 'reduce') as mock_reduce: - state.shift('token', 'value') - - self.assertEqual(state.tokens, ['token']) - self.assertEqual(state.values, ['value']) - mock_reduce.assert_called_once_with() - - def test_result_empty(self): - state = policy._ParseState() - - self.assertRaises(ValueError, lambda: state.result) - - def test_result_unreduced(self): - state = policy._ParseState() - state.tokens = ['tok1', 'tok2'] - state.values = ['val1', 'val2'] - - self.assertRaises(ValueError, lambda: state.result) - - def test_result(self): - state = policy._ParseState() - state.tokens = ['token'] - state.values = ['value'] - - self.assertEqual(state.result, 'value') - - def test_wrap_check(self): - state = policy._ParseState() - - result = state._wrap_check('(', 'the_check', ')') - - self.assertEqual(result, [('check', 'the_check')]) - - @mock.patch.object(policy, 'AndCheck', lambda x: x) - def test_make_and_expr(self): - state = policy._ParseState() - - result = state._make_and_expr('check1', 'and', 'check2') - - self.assertEqual(result, [('and_expr', ['check1', 'check2'])]) - - def test_extend_and_expr(self): - state = policy._ParseState() - mock_expr = mock.Mock() - mock_expr.add_check.return_value = 'newcheck' - - result = state._extend_and_expr(mock_expr, 'and', 'check') - - self.assertEqual(result, [('and_expr', 'newcheck')]) - mock_expr.add_check.assert_called_once_with('check') - - @mock.patch.object(policy, 'OrCheck', lambda x: x) - def test_make_or_expr(self): - state = policy._ParseState() - - result = state._make_or_expr('check1', 'or', 'check2') - - self.assertEqual(result, [('or_expr', ['check1', 'check2'])]) - - def test_extend_or_expr(self): - state = policy._ParseState() - mock_expr = mock.Mock() - mock_expr.add_check.return_value = 'newcheck' - - result = state._extend_or_expr(mock_expr, 'or', 'check') - - self.assertEqual(result, [('or_expr', 'newcheck')]) - mock_expr.add_check.assert_called_once_with('check') - - @mock.patch.object(policy, 'NotCheck', lambda x: 'not %s' % x) - def test_make_not_expr(self): - state = policy._ParseState() - - result = state._make_not_expr('not', 'check') - - self.assertEqual(result, [('check', 'not check')]) - - -class ParseTextRuleTestCase(test_base.BaseTestCase): - def test_empty(self): - result = policy._parse_text_rule('') - - self.assertTrue(isinstance(result, policy.TrueCheck)) - - @mock.patch.object(policy, '_parse_tokenize', - return_value=[('tok1', 'val1'), ('tok2', 'val2')]) - @mock.patch.object(policy._ParseState, 'shift') - @mock.patch.object(policy._ParseState, 'result', 'result') - def test_shifts(self, mock_shift, mock_parse_tokenize): - result = policy._parse_text_rule('test rule') - - self.assertEqual(result, 'result') - mock_parse_tokenize.assert_called_once_with('test rule') - mock_shift.assert_has_calls( - [mock.call('tok1', 'val1'), mock.call('tok2', 'val2')]) - - @mock.patch.object(policy, '_parse_tokenize', return_value=[]) - def test_fail(self, mock_parse_tokenize): - result = policy._parse_text_rule('test rule') - - self.assertTrue(isinstance(result, policy.FalseCheck)) - mock_parse_tokenize.assert_called_once_with('test rule') - - -class ParseRuleTestCase(test_base.BaseTestCase): - @mock.patch.object(policy, '_parse_text_rule', return_value='text rule') - @mock.patch.object(policy, '_parse_list_rule', return_value='list rule') - def test_parse_rule_string(self, mock_parse_list_rule, - mock_parse_text_rule): - result = policy._parse_rule("a string") - - self.assertEqual(result, 'text rule') - self.assertFalse(mock_parse_list_rule.called) - mock_parse_text_rule.assert_called_once_with('a string') - - @mock.patch.object(policy, '_parse_text_rule', return_value='text rule') - @mock.patch.object(policy, '_parse_list_rule', return_value='list rule') - def test_parse_rule_list(self, mock_parse_list_rule, mock_parse_text_rule): - result = policy._parse_rule([['a'], ['list']]) - - self.assertEqual(result, 'list rule') - self.assertFalse(mock_parse_text_rule.called) - mock_parse_list_rule.assert_called_once_with([['a'], ['list']]) - - -class CheckRegisterTestCase(test_base.BaseTestCase): - @mock.patch.object(policy, '_checks', {}) - def test_register_check(self): - class TestCheck(policy.Check): - pass - - policy.register('spam', TestCheck) - - self.assertEqual(policy._checks, dict(spam=TestCheck)) - - @mock.patch.object(policy, '_checks', {}) - def test_register_check_decorator(self): - @policy.register('spam') - class TestCheck(policy.Check): - pass - - self.assertEqual(policy._checks, dict(spam=TestCheck)) - - -class RuleCheckTestCase(test_base.BaseTestCase): - @mock.patch.object(ENFORCER, 'rules', {}) - def test_rule_missing(self): - check = policy.RuleCheck('rule', 'spam') - - self.assertEqual(check('target', 'creds', ENFORCER), False) - - @mock.patch.object(ENFORCER, 'rules', - dict(spam=mock.Mock(return_value=False))) - def test_rule_false(self): - enforcer = ENFORCER - - check = policy.RuleCheck('rule', 'spam') - - self.assertEqual(check('target', 'creds', enforcer), False) - enforcer.rules['spam'].assert_called_once_with('target', 'creds', - enforcer) - - @mock.patch.object(ENFORCER, 'rules', - dict(spam=mock.Mock(return_value=True))) - def test_rule_true(self): - enforcer = ENFORCER - check = policy.RuleCheck('rule', 'spam') - - self.assertEqual(check('target', 'creds', enforcer), True) - enforcer.rules['spam'].assert_called_once_with('target', 'creds', - enforcer) - - -class RoleCheckTestCase(PolicyBaseTestCase): - def test_accept(self): - check = policy.RoleCheck('role', 'sPaM') - - self.assertEqual(check('target', dict(roles=['SpAm']), - self.enforcer), True) - - def test_reject(self): - check = policy.RoleCheck('role', 'spam') - - self.assertEqual(check('target', dict(roles=[]), self.enforcer), False) - - -class HttpCheckTestCase(PolicyBaseTestCase): - def decode_post_data(self, post_data): - result = {} - for item in post_data.split('&'): - key, _sep, value = item.partition('=') - result[key] = jsonutils.loads(urlparse.unquote_plus(value)) - - return result - - @mock.patch.object(urlrequest, 'urlopen', - return_value=six.StringIO('True')) - def test_accept(self, mock_urlopen): - check = policy.HttpCheck('http', '//example.com/%(name)s') - self.assertEqual(check(dict(name='target', spam='spammer'), - dict(user='user', roles=['a', 'b', 'c']), - self.enforcer), - True) - self.assertEqual(mock_urlopen.call_count, 1) - - args = mock_urlopen.call_args[0] - - self.assertEqual(args[0], 'http://example.com/target') - self.assertEqual(self.decode_post_data(args[1]), dict( - target=dict(name='target', spam='spammer'), - credentials=dict(user='user', roles=['a', 'b', 'c']), - )) - - @mock.patch.object(urlrequest, 'urlopen', - return_value=six.StringIO('other')) - def test_reject(self, mock_urlopen): - check = policy.HttpCheck('http', '//example.com/%(name)s') - - self.assertEqual(check(dict(name='target', spam='spammer'), - dict(user='user', roles=['a', 'b', 'c']), - self.enforcer), - False) - self.assertEqual(mock_urlopen.call_count, 1) - - args = mock_urlopen.call_args[0] - - self.assertEqual(args[0], 'http://example.com/target') - self.assertEqual(self.decode_post_data(args[1]), dict( - target=dict(name='target', spam='spammer'), - credentials=dict(user='user', roles=['a', 'b', 'c']), - )) - - @mock.patch.object(urlrequest, 'urlopen', - return_value=six.StringIO('True')) - def test_http_with_objects_in_target(self, mock_urlopen): - - check = policy.HttpCheck('http', '//example.com/%(name)s') - target = {'a': object(), - 'name': 'target', - 'b': 'test data'} - self.assertEqual(check(target, - dict(user='user', roles=['a', 'b', 'c']), - self.enforcer), - True) - - @mock.patch.object(urlrequest, 'urlopen', - return_value=six.StringIO('True')) - def test_http_with_strings_in_target(self, mock_urlopen): - check = policy.HttpCheck('http', '//example.com/%(name)s') - target = {'a': 'some_string', - 'name': 'target', - 'b': 'test data'} - self.assertEqual(check(target, - dict(user='user', roles=['a', 'b', 'c']), - self.enforcer), - True) - - -class GenericCheckTestCase(PolicyBaseTestCase): - def test_no_cred(self): - check = policy.GenericCheck('name', '%(name)s') - - self.assertEqual(check(dict(name='spam'), {}, self.enforcer), False) - - def test_cred_mismatch(self): - check = policy.GenericCheck('name', '%(name)s') - - self.assertEqual(check(dict(name='spam'), - dict(name='ham'), - self.enforcer), False) - - def test_accept(self): - check = policy.GenericCheck('name', '%(name)s') - - self.assertEqual(check(dict(name='spam'), - dict(name='spam'), - self.enforcer), True) - - def test_no_key_match_in_target(self): - check = policy.GenericCheck('name', '%(name)s') - - self.assertEqual(check(dict(name1='spam'), - dict(name='spam'), - self.enforcer), False) - - def test_constant_string_mismatch(self): - check = policy.GenericCheck("'spam'", '%(name)s') - - self.assertEqual(check(dict(name='ham'), - {}, - self.enforcer), False) - - def test_constant_string_accept(self): - check = policy.GenericCheck("'spam'", '%(name)s') - - self.assertEqual(check(dict(name='spam'), - {}, - self.enforcer), True) - - def test_constant_literal_mismatch(self): - check = policy.GenericCheck("True", '%(enabled)s') - - self.assertEqual(check(dict(enabled=False), - {}, - self.enforcer), False) - - def test_constant_literal_accept(self): - check = policy.GenericCheck("True", '%(enabled)s') - - self.assertEqual(check(dict(enabled=True), - {}, - self.enforcer), True) - - def test_deep_credentials_dictionary_lookup(self): - check = policy.GenericCheck("a.b.c.d", 'APPLES') - - credentials = {'a': {'b': {'c': {'d': 'APPLES'}}}} - - self.assertEqual(check({}, - credentials, - self.enforcer), True) - - def test_missing_credentials_dictionary_lookup(self): - credentials = {'a': 'APPLES', 'o': {'t': 'ORANGES'}} - - # First a valid check - rest of case is expecting failures - # Should prove the basic credentials structure before we test - # for failure cases. - check = policy.GenericCheck("o.t", 'ORANGES') - self.assertEqual(check({}, - credentials, - self.enforcer), True) - - # Case where final key is missing - check = policy.GenericCheck("o.v", 'ORANGES') - self.assertEqual(check({}, - credentials, - self.enforcer), False) - - # Attempt to access key under a missing dictionary - check = policy.GenericCheck("q.v", 'APPLES') - self.assertEqual(check({}, - credentials, - self.enforcer), False)