Merge "Perform basic checks on policy definitions"

This commit is contained in:
Jenkins 2016-10-13 19:23:15 +00:00 committed by Gerrit Code Review
commit 890e2bfa90
2 changed files with 104 additions and 0 deletions

View File

@ -232,6 +232,7 @@ import yaml
from oslo_policy import _cache_handler
from oslo_policy import _checks
from oslo_policy._i18n import _
from oslo_policy._i18n import _LW
from oslo_policy import _parser
from oslo_policy import opts
@ -529,6 +530,76 @@ class Enforcer(object):
if default.name not in self.rules:
self.rules[default.name] = default.check
# Detect and log obvious incorrect rule definitions
self.check_rules()
def check_rules(self):
"""Look for rule definitions that are obviously incorrect."""
undefined_checks = []
cyclic_checks = []
violation = False
for name, check in self.rules.items():
if self._undefined_check(check):
undefined_checks.append(name)
violation = True
if self._cycle_check(check):
cyclic_checks.append(name)
violation = True
if undefined_checks:
LOG.warning(_LW('Policies %(names)s reference a rule that is not '
'defined.'), {'names': undefined_checks})
if cyclic_checks:
LOG.warning(_LW('Policies %(names)s are part of a cyclical '
'reference.'), {'names': cyclic_checks})
return not violation
def _undefined_check(self, check):
'''Check if a RuleCheck references an undefined rule.'''
if isinstance(check, RuleCheck):
if check.match not in self.rules:
# Undefined rule
return True
# An AndCheck or OrCheck is composed of multiple rules so check
# each of those.
rules = getattr(check, 'rules', None)
if rules:
for rule in rules:
if self._undefined_check(rule):
return True
return False
def _cycle_check(self, check, seen=None):
'''Check if RuleChecks cycle.
Looking for something like:
"foo": "rule:bar"
"bar": "rule:foo"
'''
if seen is None:
seen = set()
if isinstance(check, RuleCheck):
if check.match in seen:
# Cycle found
return True
seen.add(check.match)
if check.match in self.rules:
# There can only be a cycle if the referenced rule is defined.
if self._cycle_check(self.rules[check.match], seen):
return True
# An AndCheck or OrCheck is composed of multiple rules so check
# each of those.
rules = getattr(check, 'rules', None)
if rules:
for rule in rules:
if self._cycle_check(rule, seen):
return True
return False
@staticmethod
def _is_directory_updated(cache, path):
# Get the current modified time and compare it to what is in

View File

@ -832,3 +832,36 @@ class RuleDefaultTestCase(base.PolicyBaseTestCase):
opt1 = policy.RuleDefault(name='foo', check_str='rule:foo')
opt2 = RuleDefaultSub(name='bar', check_str='rule:foo')
self.assertNotEqual(opt1, opt2)
class EnforcerCheckRulesTest(base.PolicyBaseTestCase):
def setUp(self):
super(EnforcerCheckRulesTest, self).setUp()
def test_no_violations(self):
self.create_config_file('policy.json', POLICY_JSON_CONTENTS)
self.enforcer.load_rules(True)
self.assertTrue(self.enforcer.check_rules())
def test_undefined_rule(self):
rules = jsonutils.dumps({'foo': 'rule:bar'})
self.create_config_file('policy.json', rules)
self.enforcer.load_rules(True)
self.assertFalse(self.enforcer.check_rules())
def test_cyclical_rules(self):
rules = jsonutils.dumps({'foo': 'rule:bar', 'bar': 'rule:foo'})
self.create_config_file('policy.json', rules)
self.enforcer.load_rules(True)
self.assertFalse(self.enforcer.check_rules())
def test_complex_cyclical_rules(self):
rules = jsonutils.dumps({'foo': 'rule:bar',
'bar': 'rule:baz and role:admin',
'baz': 'rule:foo or role:user'})
self.create_config_file('policy.json', rules)
self.enforcer.load_rules(True)
self.assertFalse(self.enforcer.check_rules())