Add sample file generation script and helper methods

This adds a console script to oslo.policy that will output a sample
policy file in yaml format. It works by looking at the configured
namespace(s) under an 'oslo.policy.policies' entry point. A method, or
methods, should be provided which return a list of
oslo_policy.policy.RuleDefault objects.

To use this script add an entry to setup.cfg in a project with something
like:

oslo.policy.policies =
    nova.api = nova.api.opts:list_policies

list_policies should be a method which returns a list of
oslo_policy.policy.RuleDefault objects.

Then run it like:

oslopolicy-sample-generator --namespace nova.api

--output-file can also be specified, or those options can be configured
in a file which can be specified with --config-file.

Change-Id: If25d48313b91a6610119220e13f635c6e28b2a59
Partially-Implements: bp policy-sample-generation
This commit is contained in:
Andrew Laski 2016-05-09 15:06:00 -04:00
parent ea29939194
commit 474c120ae6
4 changed files with 313 additions and 2 deletions

View File

@ -53,6 +53,8 @@ benefits.
Enforcer.enforce.
* More will be documented as capabilities are added.
* A sample policy file can be generated based on the registered policies
rather than needing to manually maintain one.
How to register
---------------
@ -74,3 +76,33 @@ How to register
enforcer.register_default(policy.RuleDefault('identity:create_region',
'rule:admin_required',
description='helpful text'))
Sample file generation
----------------------
In setup.cfg of a project using oslo.policy::
[entry_points]
oslo.policy.policies =
nova.api = nova.api.opts:list_policies
nova.compute.api = nova.compute.api.opts:list_policies
where list_policies is a method that returns a list of policy.RuleDefault
objects.
Run the oslopolicy-sample-generator script with some configuration options::
oslopolicy-sample-generator --namespace nova.api --namespace nova.compute.api --output-file policy-sample.yaml
or::
oslopolicy-sample-generator --config-file policy-generator.conf
where policy-generator.conf looks like::
[DEFAULT]
output_file = policy-sample.yaml
namespace = nova.api
namespace = nova.compute.api
If output_file is ommitted the sample file will be sent to stdout.

130
oslo_policy/generator.py Normal file
View File

@ -0,0 +1,130 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import sys
import textwrap
from oslo_config import cfg
import stevedore
LOG = logging.getLogger(__name__)
_generator_opts = [
cfg.StrOpt('output-file',
help='Path of the file to write to. Defaults to stdout.'),
cfg.MultiStrOpt('namespace',
required=True,
help='Option namespace(s) under "oslo.policy.policies" in '
'which to query for options.'),
]
def _get_policies_dict(namespaces):
"""Find the options available via the given namespaces.
:param namespaces: a list of namespaces registered under
'oslo.policy.policies'
:returns: a dict of {namespace1: [rule_default_1, rule_default_2],
namespace2: [rule_default_3]...}
"""
mgr = stevedore.named.NamedExtensionManager(
'oslo.policy.policies',
names=namespaces,
on_load_failure_callback=on_load_failure_callback,
invoke_on_load=True)
opts = {ep.name: ep.obj for ep in mgr}
return opts
def _format_help_text(description):
"""Format a comment for a policy based on the description provided.
:param description: A string with helpful text.
:returns: A line wrapped comment, or blank comment if description is None
"""
if not description:
return '#'
return textwrap.fill(description, 70, initial_indent='# ',
subsequent_indent='# ',
break_long_words=False,
replace_whitespace=False)
def _format_rule_default_yaml(default, include_help=True):
"""Create a yaml node from the provided policy.RuleDefault.
:param default: A policy.RuleDefault object
:returns: A string containing a yaml representation of the RuleDefault
"""
text = ('"%(name)s": "%(check_str)s"\n' %
{'name': default.name,
'check_str': default.check_str})
if include_help:
text = ('%(help)s\n%(text)s' %
{'help': _format_help_text(default.description),
'text': text})
return text
def _sort_and_format_by_section(policies, include_help=True):
"""Generate a list of policy section texts
The text for a section will be created and returned one at a time. The
sections are sorted first to provide for consistent output.
Text is created in yaml format. This is done manually because PyYaml
does not facilitate outputing comments.
:param policies: A dict of {section1: [rule_default_1, rule_default_2],
section2: [rule_default_3]}
"""
for section in sorted(policies.keys()):
rule_defaults = policies[section]
for rule_default in rule_defaults:
yield _format_rule_default_yaml(rule_default,
include_help=include_help)
def _generate_sample(namespaces, output_file=None):
"""Generate a sample policy file.
List all of the policies available via the namespace specified in the
given configuration and write them to the specified output file.
:param namespaces: a list of namespaces registered under
'oslo.policy.policies'. Stevedore will look here for
policy options.
:param output_file: The path of a file to output to. stdout used if None.
"""
policies = _get_policies_dict(namespaces)
output_file = (open(output_file, 'w') if output_file
else sys.stdout)
for section in _sort_and_format_by_section(policies):
output_file.write(section)
def on_load_failure_callback(*args, **kwargs):
raise
def generate_sample(args=None):
logging.basicConfig(level=logging.WARN)
conf = cfg.ConfigOpts()
conf.register_cli_opts(_generator_opts)
conf.register_opts(_generator_opts)
conf(args)
_generate_sample(conf.namespace, conf.output_file)

View File

@ -0,0 +1,150 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import fixtures
import mock
from oslo_config import cfg
from six import moves
import testtools
from oslo_policy import generator
from oslo_policy import policy
from oslo_policy.tests import base
OPTS = {'base_rules': [policy.RuleDefault('admin', 'is_admin:True',
description='Basic admin check'),
policy.RuleDefault('owner',
'project_id:%(project_id)s',
description='This is a long '
'description to check '
'that line wrapping '
'functions properly')],
'custom_field': [policy.RuleDefault('shared',
'field:networks:shared=True')],
'rules': [policy.RuleDefault('admin_or_owner',
'rule:admin or rule:owner')],
}
class GenerateSampleTestCase(base.PolicyBaseTestCase):
def setUp(self):
super(GenerateSampleTestCase, self).setUp()
self.enforcer = policy.Enforcer(self.conf, policy_file='policy.yaml')
def _capture_stdout(self):
self.useFixture(fixtures.MonkeyPatch('sys.stdout', moves.StringIO()))
return sys.stdout
@mock.patch('stevedore.named.NamedExtensionManager')
def test_generate_loadable_yaml(self, mock_named_mgr):
mock_eps = []
for name, opts in OPTS.items():
mock_ep = mock.Mock()
mock_ep.configure_mock(name=name, obj=opts)
mock_eps.append(mock_ep)
mock_named_mgr.return_value = mock_eps
output_file = self.get_config_file_fullname('policy.yaml')
generator._generate_sample(['base_rules', 'rules'], output_file)
self.enforcer.load_rules()
self.assertIn('owner', self.enforcer.rules)
self.assertIn('admin', self.enforcer.rules)
self.assertIn('admin_or_owner', self.enforcer.rules)
self.assertEqual('project_id:%(project_id)s',
str(self.enforcer.rules['owner']))
self.assertEqual('is_admin:True', str(self.enforcer.rules['admin']))
self.assertEqual('(rule:admin or rule:owner)',
str(self.enforcer.rules['admin_or_owner']))
@mock.patch('stevedore.named.NamedExtensionManager')
def test_expected_content(self, mock_named_mgr):
mock_eps = []
for name, opts in OPTS.items():
mock_ep = mock.Mock()
mock_ep.configure_mock(name=name, obj=opts)
mock_eps.append(mock_ep)
mock_named_mgr.return_value = mock_eps
expected = '''# Basic admin check
"admin": "is_admin:True"
# This is a long description to check that line wrapping functions
# properly
"owner": "project_id:%(project_id)s"
#
"shared": "field:networks:shared=True"
#
"admin_or_owner": "rule:admin or rule:owner"
'''
output_file = self.get_config_file_fullname('policy.yaml')
generator._generate_sample(['base_rules', 'rules'], output_file)
with open(output_file, 'r') as written_file:
written_policy = written_file.read()
self.assertEqual(expected, written_policy)
@mock.patch('stevedore.named.NamedExtensionManager')
def test_expected_content_stdout(self, mock_named_mgr):
mock_eps = []
for name, opts in OPTS.items():
mock_ep = mock.Mock()
mock_ep.configure_mock(name=name, obj=opts)
mock_eps.append(mock_ep)
mock_named_mgr.return_value = mock_eps
expected = '''# Basic admin check
"admin": "is_admin:True"
# This is a long description to check that line wrapping functions
# properly
"owner": "project_id:%(project_id)s"
#
"shared": "field:networks:shared=True"
#
"admin_or_owner": "rule:admin or rule:owner"
'''
stdout = self._capture_stdout()
generator._generate_sample(['base_rules', 'rules'], output_file=None)
self.assertEqual(expected, stdout.getvalue())
class GeneratorRaiseErrorTestCase(testtools.TestCase):
def test_generator_raises_error(self):
"""Verifies that errors from extension manager are not suppressed."""
class FakeException(Exception):
pass
class FakeEP(object):
def __init__(self):
self.name = 'callback_is_expected'
self.require = self.resolve
self.load = self.resolve
def resolve(self, *args, **kwargs):
raise FakeException()
fake_ep = FakeEP()
fake_eps = mock.Mock(return_value=[fake_ep])
with mock.patch('pkg_resources.iter_entry_points', fake_eps):
self.assertRaises(FakeException, generator._generate_sample,
fake_ep.name)
def test_generator_call_with_no_arguments_raises_error(self):
testargs = ['oslopolicy-sample-generator']
with mock.patch('sys.argv', testargs):
self.assertRaises(cfg.RequiredOptError, generator.generate_sample,
[])

View File

@ -32,8 +32,7 @@ oslo.config.opts =
console_scripts =
oslopolicy-checker = oslo_policy.shell:main
oslopolicy-sample-generator = oslo_policy.generator:generate_sample
[build_sphinx]
source-dir = doc/source