424ff50a4f
collections.MutableMapping has been deprecated since Python 3.3 and is removed in Python 3.10. The functionality should be identical. Change-Id: Ic96309fef409ba01dd24a3a70ff132a9f5352f9c
152 lines
4.3 KiB
Python
152 lines
4.3 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import collections.abc
|
|
import sys
|
|
|
|
from oslo_serialization import jsonutils
|
|
|
|
from oslo_config import cfg
|
|
from oslo_policy import opts
|
|
from oslo_policy import policy
|
|
|
|
|
|
class FakeEnforcer(object):
|
|
def __init__(self, rules, config):
|
|
self.rules = rules
|
|
self.conf = None
|
|
|
|
if config:
|
|
self.conf = cfg.ConfigOpts()
|
|
|
|
for group, options in opts.list_opts():
|
|
self.conf.register_opts(options, group)
|
|
|
|
self.conf(["--config-file={}".format(config)])
|
|
|
|
|
|
def _try_rule(key, rule, target, access_data, o):
|
|
try:
|
|
result = rule(target, access_data, o, current_rule=key)
|
|
if result:
|
|
print("passed: %s" % key)
|
|
else:
|
|
print("failed: %s" % key)
|
|
except Exception as e:
|
|
print(e)
|
|
print("exception: %s" % rule)
|
|
|
|
|
|
def flatten(d, parent_key=''):
|
|
"""Flatten a nested dictionary
|
|
|
|
Converts a dictionary with nested values to a single level flat
|
|
dictionary, with dotted notation for each key.
|
|
|
|
"""
|
|
items = []
|
|
for k, v in d.items():
|
|
new_key = parent_key + '.' + k if parent_key else k
|
|
if isinstance(v, collections.abc.MutableMapping):
|
|
items.extend(flatten(v, new_key).items())
|
|
else:
|
|
items.append((new_key, v))
|
|
return dict(items)
|
|
|
|
|
|
def tool(policy_file, access_file, apply_rule, is_admin=False,
|
|
target_file=None, enforcer_config=None):
|
|
|
|
with open(access_file, "rb", 0) as a:
|
|
access = a.read()
|
|
|
|
access_data = jsonutils.loads(access)['token']
|
|
access_data['roles'] = [role['name'] for role in access_data['roles']]
|
|
access_data['user_id'] = access_data['user']['id']
|
|
if access_data.get('project'):
|
|
access_data['project_id'] = access_data['project']['id']
|
|
if access_data.get('system'):
|
|
access_data['system_scope'] = 'all'
|
|
access_data['is_admin'] = is_admin
|
|
|
|
with open(policy_file, "rb", 0) as p:
|
|
policy_data = p.read()
|
|
|
|
rules = policy.Rules.load(policy_data, "default")
|
|
|
|
enforcer = FakeEnforcer(rules, enforcer_config)
|
|
|
|
if target_file:
|
|
with open(target_file, "rb", 0) as t:
|
|
target = t.read()
|
|
|
|
target_data = flatten(jsonutils.loads(target))
|
|
else:
|
|
target_data = {'user_id': access_data['user']['id']}
|
|
if access_data.get('project_id'):
|
|
target_data['project_id'] = access_data['project_id']
|
|
|
|
if apply_rule:
|
|
key = apply_rule
|
|
rule = rules[apply_rule]
|
|
_try_rule(key, rule, target_data, access_data, enforcer)
|
|
return
|
|
|
|
for key, rule in sorted(rules.items()):
|
|
if ":" in key:
|
|
_try_rule(key, rule, target_data, access_data, enforcer)
|
|
|
|
|
|
def main():
|
|
conf = cfg.ConfigOpts()
|
|
|
|
conf.register_cli_opt(cfg.StrOpt(
|
|
'policy',
|
|
required=True,
|
|
help='path to a policy file.'))
|
|
|
|
conf.register_cli_opt(cfg.StrOpt(
|
|
'access',
|
|
required=True,
|
|
help='path to a file containing OpenStack Identity API '
|
|
'access info in JSON format.'))
|
|
|
|
conf.register_cli_opt(cfg.StrOpt(
|
|
'target',
|
|
help='path to a file containing custom target info in '
|
|
'JSON format. This will be used to evaluate the policy with.'))
|
|
|
|
conf.register_cli_opt(cfg.StrOpt(
|
|
'rule',
|
|
help='rule to test.'))
|
|
|
|
conf.register_cli_opt(cfg.BoolOpt(
|
|
'is_admin',
|
|
help='set is_admin=True on the credentials used for the evaluation.',
|
|
default=False))
|
|
|
|
conf.register_cli_opt(cfg.StrOpt(
|
|
'enforcer_config',
|
|
help='configuration file for the oslopolicy-checker enforcer'))
|
|
|
|
conf()
|
|
|
|
tool(conf.policy, conf.access, conf.rule, conf.is_admin,
|
|
conf.target, conf.enforcer_config)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|