Perform basic checks on policy definitions

It is easy for someone to configure a policy to reference a rule
definition that does not exist. In this case the check will fallback to
the default rule(if it exists) and the deployer will never realize
they've incorrectly defined their policy.

It is also possible to configure a set of rule checks with a cycle. This
will fail at runtime with a "RunTimeError: maximum recursion depth
exceeded" and a potential 500 HTTP response from whatever API is in use.

This change adds a check for those cases which will log a warning when
those occur. The check can also be called when a service using
oslo.policy starts up and fail to start for incorrect policy
definitions. The default behavior is just to log the condition in case a
policy file is changed after startup and reloaded. That should not raise
an exception which may go unhandled and kill the service.

Change-Id: If2d1a5df05091e013293ec350861d7ada1753a47
This commit is contained in:
Andrew Laski 2016-09-20 13:53:14 -04:00 committed by ChangBo Guo(gcb)
parent 24e745a9c9
commit a1444fc415
2 changed files with 104 additions and 0 deletions

View File

@ -232,6 +232,7 @@ import yaml
from oslo_policy import _cache_handler
from oslo_policy import _checks
from oslo_policy._i18n import _
from oslo_policy._i18n import _LW
from oslo_policy import _parser
from oslo_policy import opts
@ -529,6 +530,76 @@ class Enforcer(object):
if default.name not in self.rules:
self.rules[default.name] = default.check
# Detect and log obvious incorrect rule definitions
self.check_rules()
def check_rules(self):
"""Look for rule definitions that are obviously incorrect."""
undefined_checks = []
cyclic_checks = []
violation = False
for name, check in self.rules.items():
if self._undefined_check(check):
undefined_checks.append(name)
violation = True
if self._cycle_check(check):
cyclic_checks.append(name)
violation = True
if undefined_checks:
LOG.warning(_LW('Policies %(names)s reference a rule that is not '
'defined.'), {'names': undefined_checks})
if cyclic_checks:
LOG.warning(_LW('Policies %(names)s are part of a cyclical '
'reference.'), {'names': cyclic_checks})
return not violation
def _undefined_check(self, check):
'''Check if a RuleCheck references an undefined rule.'''
if isinstance(check, RuleCheck):
if check.match not in self.rules:
# Undefined rule
return True
# An AndCheck or OrCheck is composed of multiple rules so check
# each of those.
rules = getattr(check, 'rules', None)
if rules:
for rule in rules:
if self._undefined_check(rule):
return True
return False
def _cycle_check(self, check, seen=None):
'''Check if RuleChecks cycle.
Looking for something like:
"foo": "rule:bar"
"bar": "rule:foo"
'''
if seen is None:
seen = set()
if isinstance(check, RuleCheck):
if check.match in seen:
# Cycle found
return True
seen.add(check.match)
if check.match in self.rules:
# There can only be a cycle if the referenced rule is defined.
if self._cycle_check(self.rules[check.match], seen):
return True
# An AndCheck or OrCheck is composed of multiple rules so check
# each of those.
rules = getattr(check, 'rules', None)
if rules:
for rule in rules:
if self._cycle_check(rule, seen):
return True
return False
@staticmethod
def _is_directory_updated(cache, path):
# Get the current modified time and compare it to what is in

View File

@ -832,3 +832,36 @@ class RuleDefaultTestCase(base.PolicyBaseTestCase):
opt1 = policy.RuleDefault(name='foo', check_str='rule:foo')
opt2 = RuleDefaultSub(name='bar', check_str='rule:foo')
self.assertNotEqual(opt1, opt2)
class EnforcerCheckRulesTest(base.PolicyBaseTestCase):
def setUp(self):
super(EnforcerCheckRulesTest, self).setUp()
def test_no_violations(self):
self.create_config_file('policy.json', POLICY_JSON_CONTENTS)
self.enforcer.load_rules(True)
self.assertTrue(self.enforcer.check_rules())
def test_undefined_rule(self):
rules = jsonutils.dumps({'foo': 'rule:bar'})
self.create_config_file('policy.json', rules)
self.enforcer.load_rules(True)
self.assertFalse(self.enforcer.check_rules())
def test_cyclical_rules(self):
rules = jsonutils.dumps({'foo': 'rule:bar', 'bar': 'rule:foo'})
self.create_config_file('policy.json', rules)
self.enforcer.load_rules(True)
self.assertFalse(self.enforcer.check_rules())
def test_complex_cyclical_rules(self):
rules = jsonutils.dumps({'foo': 'rule:bar',
'bar': 'rule:baz and role:admin',
'baz': 'rule:foo or role:user'})
self.create_config_file('policy.json', rules)
self.enforcer.load_rules(True)
self.assertFalse(self.enforcer.check_rules())