diff --git a/oslo_policy/shell.py b/oslo_policy/shell.py index 222ab6a9..fe143fb4 100644 --- a/oslo_policy/shell.py +++ b/oslo_policy/shell.py @@ -14,6 +14,7 @@ # limitations under the License. import argparse +import collections import sys from oslo_serialization import jsonutils @@ -33,7 +34,25 @@ def _try_rule(key, rule, target, access_data, o): print("exception: %s" % rule) -def tool(policy_file, access_file, apply_rule, is_admin=False): +def flatten(d, parent_key=''): + """Flatten a nested dictionary + + Converts a dictionary with nested values to a single level flat + dictionary, with dotted notation for each key. + + """ + items = [] + for k, v in d.items(): + new_key = parent_key + '.' + k if parent_key else k + if isinstance(v, collections.MutableMapping): + items.extend(flatten(v, new_key).items()) + else: + items.append((new_key, v)) + return dict(items) + + +def tool(policy_file, access_file, apply_rule, is_admin=False, + target_file=None): access = access_file.read() access_data = jsonutils.loads(access)['token'] access_data['roles'] = [role['name'] for role in access_data['roles']] @@ -47,16 +66,20 @@ def tool(policy_file, access_file, apply_rule, is_admin=False): o = Object() o.rules = rules - target = {"project_id": access_data['project_id']} + if target_file: + target = target_file.read() + target_data = flatten(jsonutils.loads(target)) + else: + target_data = {"project_id": access_data['project_id']} if apply_rule: key = apply_rule rule = rules[apply_rule] - _try_rule(key, rule, target, access_data, o) + _try_rule(key, rule, target_data, access_data, o) return for key, rule in rules.items(): if ":" in key: - _try_rule(key, rule, target, access_data, o) + _try_rule(key, rule, target_data, access_data, o) def main(): @@ -72,6 +95,11 @@ def main(): type=argparse.FileType('rb', 0), help='path to a file containing OpenStack Identity API' + ' access info in JSON format') + parser.add_argument( + '--target', + type=argparse.FileType('rb', 0), + help='path to a file containing custom target info in' + + ' JSON format. This will be used to evaluate the policy with.') parser.add_argument( '--rule', help='rule to test') @@ -85,7 +113,7 @@ def main(): is_admin = args.is_admin.lower() == "true" except Exception: is_admin = False - tool(args.policy, args.access, args.rule, is_admin) + tool(args.policy, args.access, args.rule, is_admin, args.target) if __name__ == "__main__": diff --git a/oslo_policy/tests/test_shell.py b/oslo_policy/tests/test_shell.py index 70fa407c..97c64c81 100644 --- a/oslo_policy/tests/test_shell.py +++ b/oslo_policy/tests/test_shell.py @@ -62,6 +62,39 @@ class CheckerTestCase(base.PolicyBaseTestCase): ''' self.assertEqual(expected, stdout.getvalue()) + @mock.patch("oslo_policy._checks.TrueCheck.__call__") + def test_pass_rule_parameters_with_custom_target(self, call_mock): + apply_rule = None + is_admin = False + access_data = token_fixture.SCOPED_TOKEN_FIXTURE["token"] + access_data['roles'] = [ + role['name'] for role in access_data['roles']] + access_data['project_id'] = access_data['project']['id'] + access_data['is_admin'] = is_admin + + sample_target = { + "project_id": access_data["project"]["id"], + "domain_id": access_data["project"]["domain"]["id"] + } + self.create_config_file( + "target.json", + jsonutils.dumps(sample_target)) + + policy_file = open(self.get_config_file_fullname('policy.yaml'), 'r') + access_file = open(self.get_config_file_fullname('access.json'), 'r') + target_file = open(self.get_config_file_fullname('target.json'), 'r') + stdout = self._capture_stdout() + + shell.tool(policy_file, access_file, apply_rule, is_admin, + target_file) + call_mock.assert_called_once_with( + sample_target, access_data, mock.ANY, + current_rule="sampleservice:sample_rule") + + expected = '''passed: sampleservice:sample_rule +''' + self.assertEqual(expected, stdout.getvalue()) + def test_all_nonadmin(self): policy_file = open(self.get_config_file_fullname('policy.yaml'), 'r') @@ -75,3 +108,30 @@ class CheckerTestCase(base.PolicyBaseTestCase): expected = '''passed: sampleservice:sample_rule ''' self.assertEqual(expected, stdout.getvalue()) + + def test_flatten_from_dict(self): + target = { + "target": { + "secret": { + "project_id": "1234" + } + } + } + result = shell.flatten(target) + self.assertEqual(result, {"target.secret.project_id": "1234"}) + + def test_flatten_from_file(self): + target = { + "target": { + "secret": { + "project_id": "1234" + } + } + } + self.create_config_file( + "target.json", + jsonutils.dumps(target)) + target_file = open(self.get_config_file_fullname('target.json'), 'r') + target_from_file = target_file.read() + result = shell.flatten(jsonutils.loads(target_from_file)) + self.assertEqual(result, {"target.secret.project_id": "1234"}) diff --git a/releasenotes/notes/Pass-target-dict-to-oslopolicy-checker-87185d40aec413ee.yaml b/releasenotes/notes/Pass-target-dict-to-oslopolicy-checker-87185d40aec413ee.yaml new file mode 100644 index 00000000..a844f3f0 --- /dev/null +++ b/releasenotes/notes/Pass-target-dict-to-oslopolicy-checker-87185d40aec413ee.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + oslopolicy-checker was added the ability to accept a file containing a hash + that represents the target. This makes it possible to check policies that + have non-conventional targets such as barbican.