
We found out the when policy files are generated they don't have deprecated policies. This can be problematic for services that use rendered policy files because they could still be using the deprecated policy name and not the new policy name. This commit makes sure we render the deprecated policy name as an alias to the new policy name. This makes it so that operators don't have to change two policies if they override a deprecated policy. Closes-Bug: 1742569 Change-Id: Iaf0c89a035775770ceaa230c65ce8eb195b9d82b
488 lines
21 KiB
Python
488 lines
21 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
# # Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import operator
|
|
import sys
|
|
|
|
import fixtures
|
|
import mock
|
|
from oslo_config import cfg
|
|
from six import moves
|
|
import stevedore
|
|
import testtools
|
|
|
|
from oslo_policy import generator
|
|
from oslo_policy import policy
|
|
from oslo_policy.tests import base
|
|
|
|
|
|
OPTS = {'base_rules': [policy.RuleDefault('admin', 'is_admin:True',
|
|
description='Basic admin check'),
|
|
policy.DocumentedRuleDefault('owner',
|
|
('project_id:%'
|
|
'(project_id)s'),
|
|
'This is a long '
|
|
'description to check '
|
|
'that line wrapping '
|
|
'functions properly',
|
|
[{'path': '/foo/',
|
|
'method': 'GET'},
|
|
{'path': '/test/',
|
|
'method': 'POST'}])],
|
|
'custom_field': [policy.RuleDefault('shared',
|
|
'field:networks:shared=True')],
|
|
'rules': [policy.RuleDefault('admin_or_owner',
|
|
'rule:admin or rule:owner')],
|
|
}
|
|
|
|
|
|
class GenerateSampleYAMLTestCase(base.PolicyBaseTestCase):
|
|
def setUp(self):
|
|
super(GenerateSampleYAMLTestCase, self).setUp()
|
|
self.enforcer = policy.Enforcer(self.conf, policy_file='policy.yaml')
|
|
|
|
def _capture_stdout(self):
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', moves.StringIO()))
|
|
return sys.stdout
|
|
|
|
def test_generate_loadable_yaml(self):
|
|
extensions = []
|
|
for name, opts in OPTS.items():
|
|
ext = stevedore.extension.Extension(name=name, entry_point=None,
|
|
plugin=None, obj=opts)
|
|
extensions.append(ext)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=extensions, namespace=['base_rules', 'rules'])
|
|
|
|
output_file = self.get_config_file_fullname('policy.yaml')
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr) as mock_ext_mgr:
|
|
# generate sample-policy file with only rules
|
|
generator._generate_sample(['base_rules', 'rules'], output_file,
|
|
include_help=False)
|
|
mock_ext_mgr.assert_called_once_with(
|
|
'oslo.policy.policies', names=['base_rules', 'rules'],
|
|
on_load_failure_callback=generator.on_load_failure_callback,
|
|
invoke_on_load=True)
|
|
|
|
self.enforcer.load_rules()
|
|
|
|
self.assertIn('owner', self.enforcer.rules)
|
|
self.assertIn('admin', self.enforcer.rules)
|
|
self.assertIn('admin_or_owner', self.enforcer.rules)
|
|
self.assertEqual('project_id:%(project_id)s',
|
|
str(self.enforcer.rules['owner']))
|
|
self.assertEqual('is_admin:True', str(self.enforcer.rules['admin']))
|
|
self.assertEqual('(rule:admin or rule:owner)',
|
|
str(self.enforcer.rules['admin_or_owner']))
|
|
|
|
def test_expected_content(self):
|
|
extensions = []
|
|
for name, opts in OPTS.items():
|
|
ext = stevedore.extension.Extension(name=name, entry_point=None,
|
|
plugin=None, obj=opts)
|
|
extensions.append(ext)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=extensions, namespace=['base_rules', 'rules'])
|
|
|
|
expected = '''# Basic admin check
|
|
#"admin": "is_admin:True"
|
|
|
|
# This is a long description to check that line wrapping functions
|
|
# properly
|
|
# GET /foo/
|
|
# POST /test/
|
|
#"owner": "project_id:%(project_id)s"
|
|
|
|
#
|
|
#"shared": "field:networks:shared=True"
|
|
|
|
#
|
|
#"admin_or_owner": "rule:admin or rule:owner"
|
|
|
|
'''
|
|
output_file = self.get_config_file_fullname('policy.yaml')
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr) as mock_ext_mgr:
|
|
generator._generate_sample(['base_rules', 'rules'], output_file)
|
|
mock_ext_mgr.assert_called_once_with(
|
|
'oslo.policy.policies', names=['base_rules', 'rules'],
|
|
on_load_failure_callback=generator.on_load_failure_callback,
|
|
invoke_on_load=True)
|
|
|
|
with open(output_file, 'r') as written_file:
|
|
written_policy = written_file.read()
|
|
|
|
self.assertEqual(expected, written_policy)
|
|
|
|
def test_expected_content_stdout(self):
|
|
extensions = []
|
|
for name, opts in OPTS.items():
|
|
ext = stevedore.extension.Extension(name=name, entry_point=None,
|
|
plugin=None, obj=opts)
|
|
extensions.append(ext)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=extensions, namespace=['base_rules', 'rules'])
|
|
|
|
expected = '''# Basic admin check
|
|
#"admin": "is_admin:True"
|
|
|
|
# This is a long description to check that line wrapping functions
|
|
# properly
|
|
# GET /foo/
|
|
# POST /test/
|
|
#"owner": "project_id:%(project_id)s"
|
|
|
|
#
|
|
#"shared": "field:networks:shared=True"
|
|
|
|
#
|
|
#"admin_or_owner": "rule:admin or rule:owner"
|
|
|
|
'''
|
|
stdout = self._capture_stdout()
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr) as mock_ext_mgr:
|
|
generator._generate_sample(['base_rules', 'rules'],
|
|
output_file=None)
|
|
mock_ext_mgr.assert_called_once_with(
|
|
'oslo.policy.policies', names=['base_rules', 'rules'],
|
|
on_load_failure_callback=generator.on_load_failure_callback,
|
|
invoke_on_load=True)
|
|
|
|
self.assertEqual(expected, stdout.getvalue())
|
|
|
|
def test_deprecated_policies_are_aliased_to_new_names(self):
|
|
deprecated_rule = policy.DeprecatedRule(
|
|
name='foo:post_bar',
|
|
check_str='role:fizz'
|
|
)
|
|
new_rule = policy.RuleDefault(
|
|
name='foo:create_bar',
|
|
check_str='role:fizz',
|
|
description='Create a bar.',
|
|
deprecated_rule=deprecated_rule,
|
|
deprecated_reason=(
|
|
'foo:post_bar is being removed in favor of foo:create_bar'
|
|
),
|
|
deprecated_since='N'
|
|
)
|
|
opts = {'rules': [new_rule]}
|
|
|
|
extensions = []
|
|
for name, opts in opts.items():
|
|
ext = stevedore.extension.Extension(name=name, entry_point=None,
|
|
plugin=None, obj=opts)
|
|
extensions.append(ext)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=extensions, namespace=['rules'])
|
|
|
|
expected = '''# DEPRECATED
|
|
# "foo:post_bar":"role:fizz" has been deprecated since N in favor of
|
|
# "foo:create_bar":"role:fizz".
|
|
"foo:post_bar": "rule:foo:create_bar"
|
|
'''
|
|
stdout = self._capture_stdout()
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr) as mock_ext_mgr:
|
|
generator._generate_sample(['rules'], output_file=None)
|
|
mock_ext_mgr.assert_called_once_with(
|
|
'oslo.policy.policies', names=['rules'],
|
|
on_load_failure_callback=generator.on_load_failure_callback,
|
|
invoke_on_load=True
|
|
)
|
|
self.assertEqual(expected, stdout.getvalue())
|
|
|
|
def test_empty_line_formatting(self):
|
|
rule = [policy.RuleDefault('admin', 'is_admin:True',
|
|
description='Check Summary \n'
|
|
'\n'
|
|
'This is a description to '
|
|
'check that empty line has '
|
|
'no white spaces.')]
|
|
ext = stevedore.extension.Extension(name='check_rule',
|
|
entry_point=None,
|
|
plugin=None, obj=rule)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=[ext], namespace=['check_rule'])
|
|
|
|
# no whitespace on empty line
|
|
expected = '''# Check Summary
|
|
#
|
|
# This is a description to check that empty line has no white spaces.
|
|
#"admin": "is_admin:True"
|
|
|
|
'''
|
|
output_file = self.get_config_file_fullname('policy.yaml')
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr) as mock_ext_mgr:
|
|
generator._generate_sample(['check_rule'], output_file)
|
|
mock_ext_mgr.assert_called_once_with(
|
|
'oslo.policy.policies', names=['check_rule'],
|
|
on_load_failure_callback=generator.on_load_failure_callback,
|
|
invoke_on_load=True)
|
|
|
|
with open(output_file, 'r') as written_file:
|
|
written_policy = written_file.read()
|
|
|
|
self.assertEqual(expected, written_policy)
|
|
|
|
|
|
class GenerateSampleJSONTestCase(base.PolicyBaseTestCase):
|
|
def setUp(self):
|
|
super(GenerateSampleJSONTestCase, self).setUp()
|
|
self.enforcer = policy.Enforcer(self.conf, policy_file='policy.json')
|
|
|
|
def _capture_stdout(self):
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', moves.StringIO()))
|
|
return sys.stdout
|
|
|
|
def test_generate_loadable_json(self):
|
|
extensions = []
|
|
for name, opts in OPTS.items():
|
|
ext = stevedore.extension.Extension(name=name, entry_point=None,
|
|
plugin=None, obj=opts)
|
|
extensions.append(ext)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=extensions, namespace=['base_rules', 'rules'])
|
|
|
|
output_file = self.get_config_file_fullname('policy.json')
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr) as mock_ext_mgr:
|
|
# generate sample-policy file with only rules
|
|
generator._generate_sample(['base_rules', 'rules'], output_file,
|
|
output_format='json',
|
|
include_help=False)
|
|
mock_ext_mgr.assert_called_once_with(
|
|
'oslo.policy.policies', names=['base_rules', 'rules'],
|
|
on_load_failure_callback=generator.on_load_failure_callback,
|
|
invoke_on_load=True)
|
|
|
|
self.enforcer.load_rules()
|
|
|
|
self.assertIn('owner', self.enforcer.rules)
|
|
self.assertIn('admin', self.enforcer.rules)
|
|
self.assertIn('admin_or_owner', self.enforcer.rules)
|
|
self.assertEqual('project_id:%(project_id)s',
|
|
str(self.enforcer.rules['owner']))
|
|
self.assertEqual('is_admin:True', str(self.enforcer.rules['admin']))
|
|
self.assertEqual('(rule:admin or rule:owner)',
|
|
str(self.enforcer.rules['admin_or_owner']))
|
|
|
|
def test_expected_content(self):
|
|
extensions = []
|
|
for name, opts in OPTS.items():
|
|
ext = stevedore.extension.Extension(name=name, entry_point=None,
|
|
plugin=None, obj=opts)
|
|
extensions.append(ext)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=extensions, namespace=['base_rules', 'rules'])
|
|
|
|
expected = '''{
|
|
"admin": "is_admin:True",
|
|
"owner": "project_id:%(project_id)s",
|
|
"shared": "field:networks:shared=True",
|
|
"admin_or_owner": "rule:admin or rule:owner"
|
|
}
|
|
'''
|
|
output_file = self.get_config_file_fullname('policy.json')
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr) as mock_ext_mgr:
|
|
generator._generate_sample(['base_rules', 'rules'],
|
|
output_file=output_file,
|
|
output_format='json')
|
|
mock_ext_mgr.assert_called_once_with(
|
|
'oslo.policy.policies', names=['base_rules', 'rules'],
|
|
on_load_failure_callback=generator.on_load_failure_callback,
|
|
invoke_on_load=True)
|
|
|
|
with open(output_file, 'r') as written_file:
|
|
written_policy = written_file.read()
|
|
|
|
self.assertEqual(expected, written_policy)
|
|
|
|
def test_expected_content_stdout(self):
|
|
extensions = []
|
|
for name, opts in OPTS.items():
|
|
ext = stevedore.extension.Extension(name=name, entry_point=None,
|
|
plugin=None, obj=opts)
|
|
extensions.append(ext)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=extensions, namespace=['base_rules', 'rules'])
|
|
|
|
expected = '''{
|
|
"admin": "is_admin:True",
|
|
"owner": "project_id:%(project_id)s",
|
|
"shared": "field:networks:shared=True",
|
|
"admin_or_owner": "rule:admin or rule:owner"
|
|
}
|
|
'''
|
|
stdout = self._capture_stdout()
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr) as mock_ext_mgr:
|
|
generator._generate_sample(['base_rules', 'rules'],
|
|
output_file=None,
|
|
output_format='json')
|
|
mock_ext_mgr.assert_called_once_with(
|
|
'oslo.policy.policies', names=['base_rules', 'rules'],
|
|
on_load_failure_callback=generator.on_load_failure_callback,
|
|
invoke_on_load=True)
|
|
|
|
self.assertEqual(expected, stdout.getvalue())
|
|
|
|
|
|
class GeneratorRaiseErrorTestCase(testtools.TestCase):
|
|
def test_generator_raises_error(self):
|
|
"""Verifies that errors from extension manager are not suppressed."""
|
|
class FakeException(Exception):
|
|
pass
|
|
|
|
class FakeEP(object):
|
|
|
|
def __init__(self):
|
|
self.name = 'callback_is_expected'
|
|
self.require = self.resolve
|
|
self.load = self.resolve
|
|
|
|
def resolve(self, *args, **kwargs):
|
|
raise FakeException()
|
|
|
|
fake_ep = FakeEP()
|
|
fake_eps = mock.Mock(return_value=[fake_ep])
|
|
with mock.patch('pkg_resources.iter_entry_points', fake_eps):
|
|
self.assertRaises(FakeException, generator._generate_sample,
|
|
fake_ep.name)
|
|
|
|
def test_generator_call_with_no_arguments_raises_error(self):
|
|
testargs = ['oslopolicy-sample-generator']
|
|
with mock.patch('sys.argv', testargs):
|
|
self.assertRaises(cfg.RequiredOptError, generator.generate_sample,
|
|
[])
|
|
|
|
|
|
class GeneratePolicyTestCase(base.PolicyBaseTestCase):
|
|
def setUp(self):
|
|
super(GeneratePolicyTestCase, self).setUp()
|
|
|
|
def test_merged_rules(self):
|
|
extensions = []
|
|
for name, opts in OPTS.items():
|
|
ext = stevedore.extension.Extension(name=name, entry_point=None,
|
|
plugin=None, obj=opts)
|
|
extensions.append(ext)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=extensions, namespace=['base_rules', 'rules'])
|
|
|
|
# Write the policy file for an enforcer to load
|
|
sample_file = self.get_config_file_fullname('policy-sample.yaml')
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr):
|
|
# generate sample-policy file with only rules
|
|
generator._generate_sample(['base_rules', 'rules'], sample_file,
|
|
include_help=False)
|
|
|
|
enforcer = policy.Enforcer(self.conf, policy_file='policy-sample.yaml')
|
|
# register an opt defined in the file
|
|
enforcer.register_default(policy.RuleDefault('admin',
|
|
'is_admin:False'))
|
|
# register a new opt
|
|
enforcer.register_default(policy.RuleDefault('foo', 'role:foo'))
|
|
|
|
# Mock out stevedore to return the configured enforcer
|
|
ext = stevedore.extension.Extension(name='testing', entry_point=None,
|
|
plugin=None, obj=enforcer)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=[ext], namespace='testing')
|
|
|
|
# Generate a merged file
|
|
merged_file = self.get_config_file_fullname('policy-merged.yaml')
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr) as mock_ext_mgr:
|
|
generator._generate_policy(namespace='testing',
|
|
output_file=merged_file)
|
|
mock_ext_mgr.assert_called_once_with(
|
|
'oslo.policy.enforcer', names=['testing'],
|
|
on_load_failure_callback=generator.on_load_failure_callback,
|
|
invoke_on_load=True)
|
|
|
|
# load the merged file with a new enforcer
|
|
merged_enforcer = policy.Enforcer(self.conf,
|
|
policy_file='policy-merged.yaml')
|
|
merged_enforcer.load_rules()
|
|
for rule in ['admin', 'owner', 'admin_or_owner', 'foo']:
|
|
self.assertIn(rule, merged_enforcer.rules)
|
|
|
|
self.assertEqual('is_admin:True', str(merged_enforcer.rules['admin']))
|
|
self.assertEqual('role:foo', str(merged_enforcer.rules['foo']))
|
|
|
|
|
|
class ListRedundantTestCase(base.PolicyBaseTestCase):
|
|
def setUp(self):
|
|
super(ListRedundantTestCase, self).setUp()
|
|
|
|
def _capture_stdout(self):
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', moves.StringIO()))
|
|
return sys.stdout
|
|
|
|
def test_matched_rules(self):
|
|
extensions = []
|
|
for name, opts in OPTS.items():
|
|
ext = stevedore.extension.Extension(name=name, entry_point=None,
|
|
plugin=None, obj=opts)
|
|
extensions.append(ext)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=extensions, namespace=['base_rules', 'rules'])
|
|
|
|
# Write the policy file for an enforcer to load
|
|
sample_file = self.get_config_file_fullname('policy-sample.yaml')
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr):
|
|
# generate sample-policy file with only rules
|
|
generator._generate_sample(['base_rules', 'rules'], sample_file,
|
|
include_help=False)
|
|
|
|
enforcer = policy.Enforcer(self.conf, policy_file='policy-sample.yaml')
|
|
# register opts that match those defined in policy-sample.yaml
|
|
enforcer.register_default(policy.RuleDefault('admin', 'is_admin:True'))
|
|
enforcer.register_default(
|
|
policy.RuleDefault('owner', 'project_id:%(project_id)s'))
|
|
# register a new opt
|
|
enforcer.register_default(policy.RuleDefault('foo', 'role:foo'))
|
|
|
|
# Mock out stevedore to return the configured enforcer
|
|
ext = stevedore.extension.Extension(name='testing', entry_point=None,
|
|
plugin=None, obj=enforcer)
|
|
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
|
|
extensions=[ext], namespace='testing')
|
|
|
|
stdout = self._capture_stdout()
|
|
with mock.patch('stevedore.named.NamedExtensionManager',
|
|
return_value=test_mgr) as mock_ext_mgr:
|
|
generator._list_redundant(namespace='testing')
|
|
mock_ext_mgr.assert_called_once_with(
|
|
'oslo.policy.enforcer', names=['testing'],
|
|
on_load_failure_callback=generator.on_load_failure_callback,
|
|
invoke_on_load=True)
|
|
|
|
matches = [line.split(': ', 1) for
|
|
line in stdout.getvalue().splitlines()]
|
|
matches.sort(key=operator.itemgetter(0))
|
|
|
|
# Should be 'admin'
|
|
opt0 = matches[0]
|
|
self.assertEqual('"admin"', opt0[0])
|
|
self.assertEqual('"is_admin:True"', opt0[1])
|
|
|
|
# Should be 'owner'
|
|
opt1 = matches[1]
|
|
self.assertEqual('"owner"', opt1[0])
|
|
self.assertEqual('"project_id:%(project_id)s"', opt1[1])
|