Create the temporary files needed for tests

Rather than keeping test input files in git, create them on the fly as
needed. This allows us to streamline one or two tests that modify the
files and then try to restore their old contents, and it also ensures
that looking at a given test it is clear which files are really being
used.

This change also updates the implementation of some tests that were
counting log calls to see how many files were loaded by making the
Enforcer track the files it loads so we can look at the list explicitly.
This gives us verification that the files we care about are the ones
being loaded, and is less brittle in case we change the logging
structure within the policy module.

Change-Id: I109b3d5afc622df7e5e7a12d06c4fb1baad18b1a
This commit is contained in:
Doug Hellmann 2015-02-17 17:33:13 -05:00
parent fc7da183c7
commit b046f0552e
9 changed files with 109 additions and 52 deletions

View File

@ -339,6 +339,7 @@ class Enforcer(object):
self.policy_file = policy_file or self.conf.oslo_policy.policy_file self.policy_file = policy_file or self.conf.oslo_policy.policy_file
self.use_conf = use_conf self.use_conf = use_conf
self.overwrite = overwrite self.overwrite = overwrite
self._loaded_files = []
def set_rules(self, rules, overwrite=True, use_conf=False): def set_rules(self, rules, overwrite=True, use_conf=False):
"""Create a new :class:`Rules` based on the provided dict of rules. """Create a new :class:`Rules` based on the provided dict of rules.
@ -364,6 +365,7 @@ class Enforcer(object):
fileutils.delete_cached_file(self.policy_path) fileutils.delete_cached_file(self.policy_path)
self.default_rule = None self.default_rule = None
self.policy_path = None self.policy_path = None
self._loaded_files = []
def load_rules(self, force_reload=False): def load_rules(self, force_reload=False):
"""Loads policy_path's rules. """Loads policy_path's rules.
@ -405,6 +407,7 @@ class Enforcer(object):
if reloaded or not self.rules or not overwrite: if reloaded or not self.rules or not overwrite:
rules = Rules.load_json(data, self.default_rule) rules = Rules.load_json(data, self.default_rule)
self.set_rules(rules, overwrite=overwrite, use_conf=True) self.set_rules(rules, overwrite=overwrite, use_conf=True)
self._loaded_files.append(path)
LOG.debug('Reloaded policy file: %(path)s', LOG.debug('Reloaded policy file: %(path)s',
{'path': path}) {'path': path})

View File

@ -12,8 +12,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import codecs
import os
import os.path import os.path
import fixtures
from oslo_config import fixture as config from oslo_config import fixture as config
from oslotest import base as test_base from oslotest import base as test_base
@ -21,19 +25,33 @@ from oslo_policy import _checks
from oslo_policy import policy from oslo_policy import policy
TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
'..', 'tests/var'))
class PolicyBaseTestCase(test_base.BaseTestCase): class PolicyBaseTestCase(test_base.BaseTestCase):
def setUp(self): def setUp(self):
super(PolicyBaseTestCase, self).setUp() super(PolicyBaseTestCase, self).setUp()
self.conf = self.useFixture(config.Config()).conf self.conf = self.useFixture(config.Config()).conf
self.conf(args=['--config-dir', TEST_VAR_DIR]) self.config_dir = self.useFixture(fixtures.TempDir()).path
self.conf(args=['--config-dir', self.config_dir])
self.enforcer = policy.Enforcer(self.conf) self.enforcer = policy.Enforcer(self.conf)
self.addCleanup(self.enforcer.clear) self.addCleanup(self.enforcer.clear)
def get_config_file_fullname(self, filename):
return os.path.join(self.config_dir, filename.lstrip(os.sep))
def create_config_file(self, filename, contents):
"""Create a configuration file under the config dir.
Also creates any intermediate paths needed so the file can be
in a subdirectory.
"""
path = self.get_config_file_fullname(filename)
pardir = os.path.dirname(path)
if not os.path.exists(pardir):
os.makedirs(pardir)
with codecs.open(path, 'w', encoding='utf-8') as f:
f.write(contents)
class FakeCheck(_checks.BaseCheck): class FakeCheck(_checks.BaseCheck):
def __init__(self, result=None): def __init__(self, result=None):

View File

@ -27,6 +27,32 @@ from oslo_policy import policy
from oslo_policy.tests import base from oslo_policy.tests import base
POLICY_A_CONTENTS = """
{
"default": "role:fakeA"
}
"""
POLICY_B_CONTENTS = """
{
"default": "role:fakeB"
}
"""
POLICY_FAKE_CONTENTS = """
{
"default": "role:fakeC"
}
"""
POLICY_JSON_CONTENTS = """
{
"default": "rule:admin",
"admin": "is_admin:True"
}
"""
class MyException(Exception): class MyException(Exception):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.args = args self.args = args
@ -106,6 +132,17 @@ class RulesTestCase(test_base.BaseTestCase):
class EnforcerTest(base.PolicyBaseTestCase): class EnforcerTest(base.PolicyBaseTestCase):
def setUp(self):
super(EnforcerTest, self).setUp()
self.create_config_file('policy.json', POLICY_JSON_CONTENTS)
def check_loaded_files(self, filenames):
self.assertEqual(
self.enforcer._loaded_files,
[self.get_config_file_fullname(n)
for n in filenames]
)
def test_load_file(self): def test_load_file(self):
self.conf.set_override('policy_dirs', [], group='oslo_policy') self.conf.set_override('policy_dirs', [], group='oslo_policy')
self.enforcer.load_rules(True) self.enforcer.load_rules(True)
@ -113,19 +150,24 @@ class EnforcerTest(base.PolicyBaseTestCase):
self.assertIn('default', self.enforcer.rules) self.assertIn('default', self.enforcer.rules)
self.assertIn('admin', self.enforcer.rules) self.assertIn('admin', self.enforcer.rules)
@mock.patch('oslo_policy.policy.LOG') def test_load_directory(self):
def test_load_directory(self, mock_log): self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
self.enforcer.load_rules(True) self.enforcer.load_rules(True)
self.assertIsNotNone(self.enforcer.rules) self.assertIsNotNone(self.enforcer.rules)
loaded_rules = jsonutils.loads(str(self.enforcer.rules)) loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeB', loaded_rules['default']) self.assertEqual('role:fakeB', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin']) self.assertEqual('is_admin:True', loaded_rules['admin'])
# 3 debug calls showing loading of policy.json, self.check_loaded_files([
# policy.d/a.conf, policy.d/b.conf 'policy.json',
self.assertEqual(mock_log.debug.call_count, 3) 'policy.d/a.conf',
'policy.d/b.conf',
])
@mock.patch('oslo_policy.policy.LOG') def test_load_multiple_directories(self):
def test_load_multiple_directories(self, mock_log): self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
self.create_config_file('policy.2.d/fake.conf', POLICY_FAKE_CONTENTS)
self.conf.set_override('policy_dirs', self.conf.set_override('policy_dirs',
['policy.d', 'policy.2.d'], ['policy.d', 'policy.2.d'],
group='oslo_policy') group='oslo_policy')
@ -134,12 +176,15 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules)) loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeC', loaded_rules['default']) self.assertEqual('role:fakeC', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin']) self.assertEqual('is_admin:True', loaded_rules['admin'])
# 4 debug calls showing loading of policy.json, self.check_loaded_files([
# policy.d/a.conf, policy.d/b.conf, policy.2.d/fake.conf 'policy.json',
self.assertEqual(mock_log.debug.call_count, 4) 'policy.d/a.conf',
'policy.d/b.conf',
'policy.2.d/fake.conf',
])
@mock.patch('oslo_policy.policy.LOG') def test_load_non_existed_directory(self):
def test_load_non_existed_directory(self, mock_log): self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
self.conf.set_override('policy_dirs', self.conf.set_override('policy_dirs',
['policy.d', 'policy.x.d'], ['policy.d', 'policy.x.d'],
group='oslo_policy') group='oslo_policy')
@ -147,9 +192,7 @@ class EnforcerTest(base.PolicyBaseTestCase):
self.assertIsNotNone(self.enforcer.rules) self.assertIsNotNone(self.enforcer.rules)
self.assertIn('default', self.enforcer.rules) self.assertIn('default', self.enforcer.rules)
self.assertIn('admin', self.enforcer.rules) self.assertIn('admin', self.enforcer.rules)
# 3 debug calls showing loading of policy.json, self.check_loaded_files(['policy.json', 'policy.d/a.conf'])
# policy.d/a.conf, policy.d/b.conf
self.assertEqual(mock_log.debug.call_count, 3)
def test_set_rules_type(self): def test_set_rules_type(self):
self.assertRaises(TypeError, self.assertRaises(TypeError,
@ -185,13 +228,16 @@ class EnforcerTest(base.PolicyBaseTestCase):
}""" }"""
rules = policy.Rules.load_json(rules_json) rules = policy.Rules.load_json(rules_json)
default_rule = _checks.TrueCheck() default_rule = _checks.TrueCheck()
enforcer = policy.Enforcer(cfg.CONF, default_rule=default_rule) enforcer = policy.Enforcer(self.conf, default_rule=default_rule)
enforcer.set_rules(rules) enforcer.set_rules(rules)
action = 'cloudwatch:PutMetricData' action = 'cloudwatch:PutMetricData'
creds = {'roles': ''} creds = {'roles': ''}
self.assertEqual(enforcer.enforce(action, {}, creds), True) self.assertEqual(enforcer.enforce(action, {}, creds), True)
def test_enforcer_force_reload_with_overwrite(self): def test_enforcer_force_reload_with_overwrite(self):
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
# Prepare in memory fake policies. # Prepare in memory fake policies.
self.enforcer.set_rules({'test': _parser.parse_rule('role:test')}, self.enforcer.set_rules({'test': _parser.parse_rule('role:test')},
use_conf=True) use_conf=True)
@ -220,6 +266,9 @@ class EnforcerTest(base.PolicyBaseTestCase):
self.assertIn('is_admin:True', loaded_rules['admin']) self.assertIn('is_admin:True', loaded_rules['admin'])
def test_enforcer_force_reload_without_overwrite(self): def test_enforcer_force_reload_without_overwrite(self):
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
# Prepare in memory fake policies. # Prepare in memory fake policies.
self.enforcer.set_rules({'test': _parser.parse_rule('role:test')}, self.enforcer.set_rules({'test': _parser.parse_rule('role:test')},
use_conf=True) use_conf=True)
@ -251,9 +300,12 @@ class EnforcerTest(base.PolicyBaseTestCase):
self.assertIn('is_admin:True', loaded_rules['admin']) self.assertIn('is_admin:True', loaded_rules['admin'])
def test_enforcer_keep_use_conf_flag_after_reload(self): def test_enforcer_keep_use_conf_flag_after_reload(self):
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
# We initialized enforcer with # We initialized enforcer with
# policy configure files. # policy configure files.
enforcer = policy.Enforcer(cfg.CONF) enforcer = policy.Enforcer(self.conf)
self.assertTrue(enforcer.use_conf) self.assertTrue(enforcer.use_conf)
self.assertTrue(enforcer.enforce('default', {}, self.assertTrue(enforcer.enforce('default', {},
{'roles': ['fakeB']})) {'roles': ['fakeB']}))
@ -269,19 +321,12 @@ class EnforcerTest(base.PolicyBaseTestCase):
# enforcer(), this case could happen only # enforcer(), this case could happen only
# when use_conf flag equals True. # when use_conf flag equals True.
rules = jsonutils.loads(str(enforcer.rules)) rules = jsonutils.loads(str(enforcer.rules))
with open(enforcer.policy_path, 'r') as f:
ori_rules = f.read()
def _remove_dynamic_test_rule():
with open(enforcer.policy_path, 'w') as f:
f.write(ori_rules)
self.addCleanup(_remove_dynamic_test_rule)
rules['_dynamic_test_rule'] = 'role:test' rules['_dynamic_test_rule'] = 'role:test'
with open(enforcer.policy_path, 'w') as f: with open(enforcer.policy_path, 'w') as f:
f.write(jsonutils.dumps(rules)) f.write(jsonutils.dumps(rules))
enforcer.load_rules(force_reload=True)
self.assertTrue(enforcer.enforce('_dynamic_test_rule', {}, self.assertTrue(enforcer.enforce('_dynamic_test_rule', {},
{'roles': ['test']})) {'roles': ['test']}))
@ -304,16 +349,16 @@ class EnforcerTest(base.PolicyBaseTestCase):
'test1': 'test1'}) 'test1': 'test1'})
def test_enforcer_with_default_policy_file(self): def test_enforcer_with_default_policy_file(self):
enforcer = policy.Enforcer(cfg.CONF) enforcer = policy.Enforcer(self.conf)
self.assertEqual(cfg.CONF.oslo_policy.policy_file, self.assertEqual(self.conf.oslo_policy.policy_file,
enforcer.policy_file) enforcer.policy_file)
def test_enforcer_with_policy_file(self): def test_enforcer_with_policy_file(self):
enforcer = policy.Enforcer(cfg.CONF, policy_file='non-default.json') enforcer = policy.Enforcer(self.conf, policy_file='non-default.json')
self.assertEqual('non-default.json', enforcer.policy_file) self.assertEqual('non-default.json', enforcer.policy_file)
def test_get_policy_path_raises_exc(self): def test_get_policy_path_raises_exc(self):
enforcer = policy.Enforcer(cfg.CONF, policy_file='raise_error.json') enforcer = policy.Enforcer(self.conf, policy_file='raise_error.json')
e = self.assertRaises(cfg.ConfigFilesNotFoundError, e = self.assertRaises(cfg.ConfigFilesNotFoundError,
enforcer._get_policy_path, enforcer.policy_file) enforcer._get_policy_path, enforcer.policy_file)
self.assertEqual(('raise_error.json', ), e.config_files) self.assertEqual(('raise_error.json', ), e.config_files)
@ -325,26 +370,30 @@ class EnforcerTest(base.PolicyBaseTestCase):
self.assertEqual(self.enforcer.rules, {'test': 'test1'}) self.assertEqual(self.enforcer.rules, {'test': 'test1'})
def test_enforcer_default_rule_name(self): def test_enforcer_default_rule_name(self):
enforcer = policy.Enforcer(cfg.CONF, default_rule='foo_rule') enforcer = policy.Enforcer(self.conf, default_rule='foo_rule')
self.assertEqual('foo_rule', enforcer.rules.default_rule) self.assertEqual('foo_rule', enforcer.rules.default_rule)
self.conf.set_override('policy_default_rule', 'bar_rule', self.conf.set_override('policy_default_rule', 'bar_rule',
group='oslo_policy') group='oslo_policy')
enforcer = policy.Enforcer(cfg.CONF, default_rule='foo_rule') enforcer = policy.Enforcer(self.conf, default_rule='foo_rule')
self.assertEqual('foo_rule', enforcer.rules.default_rule) self.assertEqual('foo_rule', enforcer.rules.default_rule)
enforcer = policy.Enforcer(cfg.CONF, ) enforcer = policy.Enforcer(self.conf, )
self.assertEqual('bar_rule', enforcer.rules.default_rule) self.assertEqual('bar_rule', enforcer.rules.default_rule)
class CheckFunctionTestCase(base.PolicyBaseTestCase): class CheckFunctionTestCase(base.PolicyBaseTestCase):
def setUp(self):
super(CheckFunctionTestCase, self).setUp()
self.create_config_file('policy.json', POLICY_JSON_CONTENTS)
def test_check_explicit(self): def test_check_explicit(self):
rule = base.FakeCheck() rule = base.FakeCheck()
result = self.enforcer.enforce(rule, 'target', 'creds') result = self.enforcer.enforce(rule, 'target', 'creds')
self.assertEqual(result, ('target', 'creds', self.enforcer)) self.assertEqual(result, ('target', 'creds', self.enforcer))
def test_check_no_rules(self): def test_check_no_rules(self):
self.conf.set_override('policy_file', 'empty.json', # Clear the policy.json file created in setUp()
group='oslo_policy') self.create_config_file('policy.json', "{}")
self.enforcer.default_rule = None self.enforcer.default_rule = None
self.enforcer.load_rules() self.enforcer.load_rules()
result = self.enforcer.enforce('rule', 'target', 'creds') result = self.enforcer.enforce('rule', 'target', 'creds')

View File

@ -1,3 +0,0 @@
{
"default": "role:fakeC"
}

View File

@ -1,3 +0,0 @@
{
"default": "role:fakeA"
}

View File

@ -1,3 +0,0 @@
{
"default": "role:fakeB"
}

View File

@ -1,4 +0,0 @@
{
"default": "rule:admin",
"admin": "is_admin:True"
}