Add ability to pass in target data for the oslopolicy-checker
This allows us to test the policy for other services which might have different or unusual target data formats (such as Barbican). It would be possible to pass it as a nested dictionary, e.g.: { "target": { "secret": { "project_id": "my project id" } } } or as a key pair (as oslo.policy would expect): { "target.secret.project_id": "my project id" } Both will work (note that this logic was taken from barbican). This fixes around the limitation that the target is hardcoded to be "project_id", and thus allows to test more scenarios (such as the project ID not matching). Change-Id: Ia9f7462072a8cb142251c8bb5ef19d9a25a98119
This commit is contained in:
parent
d746dfb5f4
commit
f79650325f
@ -14,6 +14,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import collections
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
@ -33,7 +34,25 @@ def _try_rule(key, rule, target, access_data, o):
|
|||||||
print("exception: %s" % rule)
|
print("exception: %s" % rule)
|
||||||
|
|
||||||
|
|
||||||
def tool(policy_file, access_file, apply_rule, is_admin=False):
|
def flatten(d, parent_key=''):
|
||||||
|
"""Flatten a nested dictionary
|
||||||
|
|
||||||
|
Converts a dictionary with nested values to a single level flat
|
||||||
|
dictionary, with dotted notation for each key.
|
||||||
|
|
||||||
|
"""
|
||||||
|
items = []
|
||||||
|
for k, v in d.items():
|
||||||
|
new_key = parent_key + '.' + k if parent_key else k
|
||||||
|
if isinstance(v, collections.MutableMapping):
|
||||||
|
items.extend(flatten(v, new_key).items())
|
||||||
|
else:
|
||||||
|
items.append((new_key, v))
|
||||||
|
return dict(items)
|
||||||
|
|
||||||
|
|
||||||
|
def tool(policy_file, access_file, apply_rule, is_admin=False,
|
||||||
|
target_file=None):
|
||||||
access = access_file.read()
|
access = access_file.read()
|
||||||
access_data = jsonutils.loads(access)['token']
|
access_data = jsonutils.loads(access)['token']
|
||||||
access_data['roles'] = [role['name'] for role in access_data['roles']]
|
access_data['roles'] = [role['name'] for role in access_data['roles']]
|
||||||
@ -47,16 +66,20 @@ def tool(policy_file, access_file, apply_rule, is_admin=False):
|
|||||||
o = Object()
|
o = Object()
|
||||||
o.rules = rules
|
o.rules = rules
|
||||||
|
|
||||||
target = {"project_id": access_data['project_id']}
|
if target_file:
|
||||||
|
target = target_file.read()
|
||||||
|
target_data = flatten(jsonutils.loads(target))
|
||||||
|
else:
|
||||||
|
target_data = {"project_id": access_data['project_id']}
|
||||||
|
|
||||||
if apply_rule:
|
if apply_rule:
|
||||||
key = apply_rule
|
key = apply_rule
|
||||||
rule = rules[apply_rule]
|
rule = rules[apply_rule]
|
||||||
_try_rule(key, rule, target, access_data, o)
|
_try_rule(key, rule, target_data, access_data, o)
|
||||||
return
|
return
|
||||||
for key, rule in rules.items():
|
for key, rule in rules.items():
|
||||||
if ":" in key:
|
if ":" in key:
|
||||||
_try_rule(key, rule, target, access_data, o)
|
_try_rule(key, rule, target_data, access_data, o)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -72,6 +95,11 @@ def main():
|
|||||||
type=argparse.FileType('rb', 0),
|
type=argparse.FileType('rb', 0),
|
||||||
help='path to a file containing OpenStack Identity API' +
|
help='path to a file containing OpenStack Identity API' +
|
||||||
' access info in JSON format')
|
' access info in JSON format')
|
||||||
|
parser.add_argument(
|
||||||
|
'--target',
|
||||||
|
type=argparse.FileType('rb', 0),
|
||||||
|
help='path to a file containing custom target info in' +
|
||||||
|
' JSON format. This will be used to evaluate the policy with.')
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--rule',
|
'--rule',
|
||||||
help='rule to test')
|
help='rule to test')
|
||||||
@ -85,7 +113,7 @@ def main():
|
|||||||
is_admin = args.is_admin.lower() == "true"
|
is_admin = args.is_admin.lower() == "true"
|
||||||
except Exception:
|
except Exception:
|
||||||
is_admin = False
|
is_admin = False
|
||||||
tool(args.policy, args.access, args.rule, is_admin)
|
tool(args.policy, args.access, args.rule, is_admin, args.target)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -62,6 +62,39 @@ class CheckerTestCase(base.PolicyBaseTestCase):
|
|||||||
'''
|
'''
|
||||||
self.assertEqual(expected, stdout.getvalue())
|
self.assertEqual(expected, stdout.getvalue())
|
||||||
|
|
||||||
|
@mock.patch("oslo_policy._checks.TrueCheck.__call__")
|
||||||
|
def test_pass_rule_parameters_with_custom_target(self, call_mock):
|
||||||
|
apply_rule = None
|
||||||
|
is_admin = False
|
||||||
|
access_data = token_fixture.SCOPED_TOKEN_FIXTURE["token"]
|
||||||
|
access_data['roles'] = [
|
||||||
|
role['name'] for role in access_data['roles']]
|
||||||
|
access_data['project_id'] = access_data['project']['id']
|
||||||
|
access_data['is_admin'] = is_admin
|
||||||
|
|
||||||
|
sample_target = {
|
||||||
|
"project_id": access_data["project"]["id"],
|
||||||
|
"domain_id": access_data["project"]["domain"]["id"]
|
||||||
|
}
|
||||||
|
self.create_config_file(
|
||||||
|
"target.json",
|
||||||
|
jsonutils.dumps(sample_target))
|
||||||
|
|
||||||
|
policy_file = open(self.get_config_file_fullname('policy.yaml'), 'r')
|
||||||
|
access_file = open(self.get_config_file_fullname('access.json'), 'r')
|
||||||
|
target_file = open(self.get_config_file_fullname('target.json'), 'r')
|
||||||
|
stdout = self._capture_stdout()
|
||||||
|
|
||||||
|
shell.tool(policy_file, access_file, apply_rule, is_admin,
|
||||||
|
target_file)
|
||||||
|
call_mock.assert_called_once_with(
|
||||||
|
sample_target, access_data, mock.ANY,
|
||||||
|
current_rule="sampleservice:sample_rule")
|
||||||
|
|
||||||
|
expected = '''passed: sampleservice:sample_rule
|
||||||
|
'''
|
||||||
|
self.assertEqual(expected, stdout.getvalue())
|
||||||
|
|
||||||
def test_all_nonadmin(self):
|
def test_all_nonadmin(self):
|
||||||
|
|
||||||
policy_file = open(self.get_config_file_fullname('policy.yaml'), 'r')
|
policy_file = open(self.get_config_file_fullname('policy.yaml'), 'r')
|
||||||
@ -75,3 +108,30 @@ class CheckerTestCase(base.PolicyBaseTestCase):
|
|||||||
expected = '''passed: sampleservice:sample_rule
|
expected = '''passed: sampleservice:sample_rule
|
||||||
'''
|
'''
|
||||||
self.assertEqual(expected, stdout.getvalue())
|
self.assertEqual(expected, stdout.getvalue())
|
||||||
|
|
||||||
|
def test_flatten_from_dict(self):
|
||||||
|
target = {
|
||||||
|
"target": {
|
||||||
|
"secret": {
|
||||||
|
"project_id": "1234"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result = shell.flatten(target)
|
||||||
|
self.assertEqual(result, {"target.secret.project_id": "1234"})
|
||||||
|
|
||||||
|
def test_flatten_from_file(self):
|
||||||
|
target = {
|
||||||
|
"target": {
|
||||||
|
"secret": {
|
||||||
|
"project_id": "1234"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.create_config_file(
|
||||||
|
"target.json",
|
||||||
|
jsonutils.dumps(target))
|
||||||
|
target_file = open(self.get_config_file_fullname('target.json'), 'r')
|
||||||
|
target_from_file = target_file.read()
|
||||||
|
result = shell.flatten(jsonutils.loads(target_from_file))
|
||||||
|
self.assertEqual(result, {"target.secret.project_id": "1234"})
|
||||||
|
@ -0,0 +1,6 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
oslopolicy-checker was added the ability to accept a file containing a hash
|
||||||
|
that represents the target. This makes it possible to check policies that
|
||||||
|
have non-conventional targets such as barbican.
|
Loading…
Reference in New Issue
Block a user