diff --git a/oslo_policy/policy.py b/oslo_policy/policy.py index 9337280f..44c0c557 100644 --- a/oslo_policy/policy.py +++ b/oslo_policy/policy.py @@ -232,6 +232,7 @@ import yaml from oslo_policy import _cache_handler from oslo_policy import _checks from oslo_policy._i18n import _ +from oslo_policy._i18n import _LW from oslo_policy import _parser from oslo_policy import opts @@ -529,6 +530,76 @@ class Enforcer(object): if default.name not in self.rules: self.rules[default.name] = default.check + # Detect and log obvious incorrect rule definitions + self.check_rules() + + def check_rules(self): + """Look for rule definitions that are obviously incorrect.""" + undefined_checks = [] + cyclic_checks = [] + violation = False + for name, check in self.rules.items(): + if self._undefined_check(check): + undefined_checks.append(name) + violation = True + if self._cycle_check(check): + cyclic_checks.append(name) + violation = True + + if undefined_checks: + LOG.warning(_LW('Policies %(names)s reference a rule that is not ' + 'defined.'), {'names': undefined_checks}) + if cyclic_checks: + LOG.warning(_LW('Policies %(names)s are part of a cyclical ' + 'reference.'), {'names': cyclic_checks}) + + return not violation + + def _undefined_check(self, check): + '''Check if a RuleCheck references an undefined rule.''' + if isinstance(check, RuleCheck): + if check.match not in self.rules: + # Undefined rule + return True + + # An AndCheck or OrCheck is composed of multiple rules so check + # each of those. + rules = getattr(check, 'rules', None) + if rules: + for rule in rules: + if self._undefined_check(rule): + return True + return False + + def _cycle_check(self, check, seen=None): + '''Check if RuleChecks cycle. + + Looking for something like: + "foo": "rule:bar" + "bar": "rule:foo" + ''' + if seen is None: + seen = set() + + if isinstance(check, RuleCheck): + if check.match in seen: + # Cycle found + return True + seen.add(check.match) + if check.match in self.rules: + # There can only be a cycle if the referenced rule is defined. + if self._cycle_check(self.rules[check.match], seen): + return True + + # An AndCheck or OrCheck is composed of multiple rules so check + # each of those. + rules = getattr(check, 'rules', None) + if rules: + for rule in rules: + if self._cycle_check(rule, seen): + return True + return False + @staticmethod def _is_directory_updated(cache, path): # Get the current modified time and compare it to what is in diff --git a/oslo_policy/tests/test_policy.py b/oslo_policy/tests/test_policy.py index 3fad5c9f..dc3ede78 100644 --- a/oslo_policy/tests/test_policy.py +++ b/oslo_policy/tests/test_policy.py @@ -832,3 +832,36 @@ class RuleDefaultTestCase(base.PolicyBaseTestCase): opt1 = policy.RuleDefault(name='foo', check_str='rule:foo') opt2 = RuleDefaultSub(name='bar', check_str='rule:foo') self.assertNotEqual(opt1, opt2) + + +class EnforcerCheckRulesTest(base.PolicyBaseTestCase): + def setUp(self): + super(EnforcerCheckRulesTest, self).setUp() + + def test_no_violations(self): + self.create_config_file('policy.json', POLICY_JSON_CONTENTS) + self.enforcer.load_rules(True) + self.assertTrue(self.enforcer.check_rules()) + + def test_undefined_rule(self): + rules = jsonutils.dumps({'foo': 'rule:bar'}) + self.create_config_file('policy.json', rules) + self.enforcer.load_rules(True) + + self.assertFalse(self.enforcer.check_rules()) + + def test_cyclical_rules(self): + rules = jsonutils.dumps({'foo': 'rule:bar', 'bar': 'rule:foo'}) + self.create_config_file('policy.json', rules) + self.enforcer.load_rules(True) + + self.assertFalse(self.enforcer.check_rules()) + + def test_complex_cyclical_rules(self): + rules = jsonutils.dumps({'foo': 'rule:bar', + 'bar': 'rule:baz and role:admin', + 'baz': 'rule:foo or role:user'}) + self.create_config_file('policy.json', rules) + self.enforcer.load_rules(True) + + self.assertFalse(self.enforcer.check_rules())