Merge "http/https check rules as stevedore extensions"
This commit is contained in:
commit
e673152a64
@ -17,19 +17,25 @@
|
||||
|
||||
import abc
|
||||
import ast
|
||||
import contextlib
|
||||
import copy
|
||||
import inspect
|
||||
import os
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
import requests
|
||||
import six
|
||||
|
||||
from oslo_policy._i18n import _
|
||||
|
||||
import stevedore
|
||||
|
||||
registered_checks = {}
|
||||
extension_checks = None
|
||||
|
||||
|
||||
def get_extensions():
|
||||
global extension_checks
|
||||
if extension_checks is None:
|
||||
em = stevedore.ExtensionManager('oslo.policy.rule_checks',
|
||||
invoke_on_load=False)
|
||||
extension_checks = {
|
||||
extension.name: extension.plugin
|
||||
for extension in em
|
||||
}
|
||||
return extension_checks
|
||||
|
||||
|
||||
def _check(rule, target, creds, enforcer, current_rule):
|
||||
@ -276,107 +282,6 @@ class RoleCheck(Check):
|
||||
return False
|
||||
|
||||
|
||||
@register('http')
|
||||
class HttpCheck(Check):
|
||||
"""Check ``http:`` rules by calling to a remote server.
|
||||
|
||||
This example implementation simply verifies that the response
|
||||
is exactly ``True``.
|
||||
"""
|
||||
|
||||
def __call__(self, target, creds, enforcer, current_rule=None):
|
||||
url = ('http:' + self.match) % target
|
||||
|
||||
# Convert instances of object() in target temporarily to
|
||||
# empty dict to avoid circular reference detection
|
||||
# errors in jsonutils.dumps().
|
||||
temp_target = copy.deepcopy(target)
|
||||
for key in target.keys():
|
||||
element = target.get(key)
|
||||
if type(element) is object:
|
||||
temp_target[key] = {}
|
||||
|
||||
data = json = None
|
||||
if (enforcer.conf.oslo_policy.remote_content_type ==
|
||||
'application/x-www-form-urlencoded'):
|
||||
data = {'rule': jsonutils.dumps(current_rule),
|
||||
'target': jsonutils.dumps(temp_target),
|
||||
'credentials': jsonutils.dumps(creds)}
|
||||
else:
|
||||
json = {'rule': current_rule,
|
||||
'target': temp_target,
|
||||
'credentials': creds}
|
||||
|
||||
with contextlib.closing(requests.post(url, json=json, data=data)) as r:
|
||||
return r.text.lstrip('"').rstrip('"') == 'True'
|
||||
|
||||
|
||||
@register('https')
|
||||
class HttpsCheck(Check):
|
||||
"""Check ``https:`` rules by calling to a remote server.
|
||||
|
||||
This example implementation simply verifies that the response
|
||||
is exactly ``True``.
|
||||
"""
|
||||
|
||||
def __call__(self, target, creds, enforcer, current_rule=None):
|
||||
url = ('https:' + self.match) % target
|
||||
|
||||
cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file
|
||||
key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file
|
||||
ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file
|
||||
verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt
|
||||
|
||||
if cert_file:
|
||||
if not os.path.exists(cert_file):
|
||||
raise RuntimeError(
|
||||
_("Unable to find ssl cert_file : %s") % cert_file)
|
||||
if not os.access(cert_file, os.R_OK):
|
||||
raise RuntimeError(
|
||||
_("Unable to access ssl cert_file : %s") % cert_file)
|
||||
if key_file:
|
||||
if not os.path.exists(key_file):
|
||||
raise RuntimeError(
|
||||
_("Unable to find ssl key_file : %s") % key_file)
|
||||
if not os.access(key_file, os.R_OK):
|
||||
raise RuntimeError(
|
||||
_("Unable to access ssl key_file : %s") % key_file)
|
||||
cert = (cert_file, key_file)
|
||||
if verify_server:
|
||||
if ca_crt_file:
|
||||
if not os.path.exists(ca_crt_file):
|
||||
raise RuntimeError(
|
||||
_("Unable to find ca cert_file : %s") % ca_crt_file)
|
||||
verify_server = ca_crt_file
|
||||
|
||||
# Convert instances of object() in target temporarily to
|
||||
# empty dict to avoid circular reference detection
|
||||
# errors in jsonutils.dumps().
|
||||
temp_target = copy.deepcopy(target)
|
||||
for key in target.keys():
|
||||
element = target.get(key)
|
||||
if type(element) is object:
|
||||
temp_target[key] = {}
|
||||
|
||||
data = json = None
|
||||
if (enforcer.conf.oslo_policy.remote_content_type ==
|
||||
'application/x-www-form-urlencoded'):
|
||||
data = {'rule': jsonutils.dumps(current_rule),
|
||||
'target': jsonutils.dumps(temp_target),
|
||||
'credentials': jsonutils.dumps(creds)}
|
||||
else:
|
||||
json = {'rule': current_rule,
|
||||
'target': temp_target,
|
||||
'credentials': creds}
|
||||
|
||||
with contextlib.closing(
|
||||
requests.post(url, json=json,
|
||||
data=data, cert=cert,
|
||||
verify=verify_server)
|
||||
) as r:
|
||||
return r.text.lstrip('"').rstrip('"') == 'True'
|
||||
|
||||
|
||||
@register(None)
|
||||
class GenericCheck(Check):
|
||||
"""Check an individual match.
|
||||
|
111
oslo_policy/_external.py
Normal file
111
oslo_policy/_external.py
Normal file
@ -0,0 +1,111 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (c) 2015 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import copy
|
||||
import os
|
||||
|
||||
from oslo_policy import _checks
|
||||
from oslo_policy._i18n import _
|
||||
from oslo_serialization import jsonutils
|
||||
import requests
|
||||
|
||||
|
||||
class HttpCheck(_checks.Check):
|
||||
"""Check ``http:`` rules by calling to a remote server.
|
||||
|
||||
This example implementation simply verifies that the response
|
||||
is exactly ``True``.
|
||||
"""
|
||||
|
||||
def __call__(self, target, creds, enforcer, current_rule=None):
|
||||
url = ('http:' + self.match) % target
|
||||
data, json = self._construct_payload(creds, current_rule,
|
||||
enforcer, target)
|
||||
with contextlib.closing(
|
||||
requests.post(url, json=json, data=data)
|
||||
) as r:
|
||||
return r.text.lstrip('"').rstrip('"') == 'True'
|
||||
|
||||
@staticmethod
|
||||
def _construct_payload(creds, current_rule, enforcer, target):
|
||||
# Convert instances of object() in target temporarily to
|
||||
# empty dict to avoid circular reference detection
|
||||
# errors in jsonutils.dumps().
|
||||
temp_target = copy.deepcopy(target)
|
||||
for key in target.keys():
|
||||
element = target.get(key)
|
||||
if type(element) is object:
|
||||
temp_target[key] = {}
|
||||
data = json = None
|
||||
if (enforcer.conf.oslo_policy.remote_content_type ==
|
||||
'application/x-www-form-urlencoded'):
|
||||
data = {'rule': jsonutils.dumps(current_rule),
|
||||
'target': jsonutils.dumps(temp_target),
|
||||
'credentials': jsonutils.dumps(creds)}
|
||||
else:
|
||||
json = {'rule': current_rule,
|
||||
'target': temp_target,
|
||||
'credentials': creds}
|
||||
return data, json
|
||||
|
||||
|
||||
class HttpsCheck(HttpCheck):
|
||||
"""Check ``https:`` rules by calling to a remote server.
|
||||
|
||||
This example implementation simply verifies that the response
|
||||
is exactly ``True``.
|
||||
"""
|
||||
|
||||
def __call__(self, target, creds, enforcer, current_rule=None):
|
||||
url = ('https:' + self.match) % target
|
||||
|
||||
cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file
|
||||
key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file
|
||||
ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file
|
||||
verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt
|
||||
|
||||
if cert_file:
|
||||
if not os.path.exists(cert_file):
|
||||
raise RuntimeError(
|
||||
_("Unable to find ssl cert_file : %s") % cert_file)
|
||||
if not os.access(cert_file, os.R_OK):
|
||||
raise RuntimeError(
|
||||
_("Unable to access ssl cert_file : %s") % cert_file)
|
||||
if key_file:
|
||||
if not os.path.exists(key_file):
|
||||
raise RuntimeError(
|
||||
_("Unable to find ssl key_file : %s") % key_file)
|
||||
if not os.access(key_file, os.R_OK):
|
||||
raise RuntimeError(
|
||||
_("Unable to access ssl key_file : %s") % key_file)
|
||||
cert = (cert_file, key_file)
|
||||
if verify_server:
|
||||
if ca_crt_file:
|
||||
if not os.path.exists(ca_crt_file):
|
||||
raise RuntimeError(
|
||||
_("Unable to find ca cert_file : %s") % ca_crt_file)
|
||||
verify_server = ca_crt_file
|
||||
|
||||
data, json = self._construct_payload(creds, current_rule,
|
||||
enforcer, target)
|
||||
with contextlib.closing(
|
||||
requests.post(url, json=json,
|
||||
data=data, cert=cert,
|
||||
verify=verify_server)
|
||||
) as r:
|
||||
return r.text.lstrip('"').rstrip('"') == 'True'
|
@ -216,7 +216,10 @@ def _parse_check(rule):
|
||||
return _checks.FalseCheck()
|
||||
|
||||
# Find what implements the check
|
||||
if kind in _checks.registered_checks:
|
||||
extension_checks = _checks.get_extensions()
|
||||
if kind in extension_checks:
|
||||
return extension_checks[kind](kind, match)
|
||||
elif kind in _checks.registered_checks:
|
||||
return _checks.registered_checks[kind](kind, match)
|
||||
elif None in _checks.registered_checks:
|
||||
return _checks.registered_checks[None](kind, match)
|
||||
|
@ -36,7 +36,7 @@ class HttpCheckFixture(fixtures.Fixture):
|
||||
|
||||
self.useFixture(
|
||||
fixtures.MonkeyPatch(
|
||||
'oslo_policy._checks.HttpCheck.__call__',
|
||||
'oslo_policy._external.HttpCheck.__call__',
|
||||
mocked_call,
|
||||
)
|
||||
)
|
||||
@ -63,7 +63,7 @@ class HttpsCheckFixture(fixtures.Fixture):
|
||||
|
||||
self.useFixture(
|
||||
fixtures.MonkeyPatch(
|
||||
'oslo_policy._checks.HttpCheck.__call__',
|
||||
'oslo_policy._external.HttpsCheck.__call__',
|
||||
mocked_call,
|
||||
)
|
||||
)
|
||||
|
@ -13,15 +13,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import mock
|
||||
from oslo_serialization import jsonutils
|
||||
from oslotest import base as test_base
|
||||
from requests_mock.contrib import fixture as rm_fixture
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from oslo_policy import _checks
|
||||
from oslo_policy import opts
|
||||
from oslo_policy.tests import base
|
||||
from oslo_policy.tests import token_fixture
|
||||
|
||||
@ -96,292 +91,6 @@ class RoleCheckTestCase(base.PolicyBaseTestCase):
|
||||
self.assertFalse(check({}, {}, self.enforcer))
|
||||
|
||||
|
||||
class HttpCheckTestCase(base.PolicyBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(HttpCheckTestCase, self).setUp()
|
||||
opts._register(self.conf)
|
||||
self.requests_mock = self.useFixture(rm_fixture.Fixture())
|
||||
|
||||
def decode_post_data(self, post_data):
|
||||
result = {}
|
||||
for item in post_data.split('&'):
|
||||
key, _sep, value = item.partition('=')
|
||||
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
|
||||
return result
|
||||
|
||||
def test_accept(self):
|
||||
self.requests_mock.post('http://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/x-www-form-urlencoded',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
||||
rule=None),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_accept_json(self):
|
||||
self.conf.set_override('remote_content_type', 'application/json',
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('http://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/json',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
credentials=cred_dict,
|
||||
target=target_dict),
|
||||
json.loads(last_request.body.decode('utf-8')))
|
||||
|
||||
def test_reject(self):
|
||||
self.requests_mock.post("http://example.com/target", text='other')
|
||||
|
||||
check = _checks.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
||||
rule=None),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_http_with_objects_in_target(self):
|
||||
self.requests_mock.post("http://example.com/target", text='True')
|
||||
|
||||
check = _checks.HttpCheck('http', '//example.com/%(name)s')
|
||||
target = {'a': object(),
|
||||
'name': 'target',
|
||||
'b': 'test data'}
|
||||
self.assertTrue(check(target,
|
||||
dict(user='user', roles=['a', 'b', 'c']),
|
||||
self.enforcer))
|
||||
|
||||
def test_http_with_strings_in_target(self):
|
||||
self.requests_mock.post("http://example.com/target", text='True')
|
||||
|
||||
check = _checks.HttpCheck('http', '//example.com/%(name)s')
|
||||
target = {'a': 'some_string',
|
||||
'name': 'target',
|
||||
'b': 'test data'}
|
||||
self.assertTrue(check(target,
|
||||
dict(user='user', roles=['a', 'b', 'c']),
|
||||
self.enforcer))
|
||||
|
||||
def test_accept_with_rule_in_argument(self):
|
||||
self.requests_mock.post('http://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
current_rule = "a_rule"
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer,
|
||||
current_rule))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
||||
rule=current_rule),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_reject_with_rule_in_argument(self):
|
||||
self.requests_mock.post("http://example.com/target", text='other')
|
||||
|
||||
check = _checks.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
current_rule = "a_rule"
|
||||
self.assertFalse(check(target_dict, cred_dict, self.enforcer,
|
||||
current_rule))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
||||
rule=current_rule),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
|
||||
class HttpsCheckTestCase(base.PolicyBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(HttpsCheckTestCase, self).setUp()
|
||||
opts._register(self.conf)
|
||||
self.requests_mock = self.useFixture(rm_fixture.Fixture())
|
||||
|
||||
def decode_post_data(self, post_data):
|
||||
result = {}
|
||||
for item in post_data.split('&'):
|
||||
key, _sep, value = item.partition('=')
|
||||
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
|
||||
return result
|
||||
|
||||
def test_https_accept(self):
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/x-www-form-urlencoded',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_accept_json(self):
|
||||
self.conf.set_override('remote_content_type', 'application/json',
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/json',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
json.loads(last_request.body.decode('utf-8')))
|
||||
|
||||
def test_https_accept_with_verify(self):
|
||||
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual(True, last_request.verify)
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_accept_with_verify_cert(self):
|
||||
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
with mock.patch('os.path.exists') as path_exists:
|
||||
path_exists.return_value = True
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('ca.crt', last_request.verify)
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_accept_with_verify_and_client_certs(self):
|
||||
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_client_key_file', "client.key",
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_client_crt_file', "client.crt",
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
with mock.patch('os.path.exists') as path_exists:
|
||||
with mock.patch('os.access') as os_access:
|
||||
path_exists.return_value = True
|
||||
os_access.return_value = True
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('ca.crt', last_request.verify)
|
||||
self.assertEqual(('client.crt', 'client.key'), last_request.cert)
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_reject(self):
|
||||
self.requests_mock.post("https://example.com/target", text='other')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_with_objects_in_target(self):
|
||||
self.requests_mock.post("https://example.com/target", text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
target = {'a': object(),
|
||||
'name': 'target',
|
||||
'b': 'test data'}
|
||||
self.assertTrue(check(target,
|
||||
dict(user='user', roles=['a', 'b', 'c']),
|
||||
self.enforcer))
|
||||
|
||||
def test_https_with_strings_in_target(self):
|
||||
self.requests_mock.post("https://example.com/target", text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
target = {'a': 'some_string',
|
||||
'name': 'target',
|
||||
'b': 'test data'}
|
||||
self.assertTrue(check(target,
|
||||
dict(user='user', roles=['a', 'b', 'c']),
|
||||
self.enforcer))
|
||||
|
||||
|
||||
class GenericCheckTestCase(base.PolicyBaseTestCase):
|
||||
def test_no_cred(self):
|
||||
check = _checks.GenericCheck('name', '%(name)s')
|
||||
|
310
oslo_policy/tests/test_external.py
Normal file
310
oslo_policy/tests/test_external.py
Normal file
@ -0,0 +1,310 @@
|
||||
# Copyright (c) 2015 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import mock
|
||||
from oslo_serialization import jsonutils
|
||||
from requests_mock.contrib import fixture as rm_fixture
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from oslo_policy import _external
|
||||
from oslo_policy import opts
|
||||
from oslo_policy.tests import base
|
||||
|
||||
|
||||
class HttpCheckTestCase(base.PolicyBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(HttpCheckTestCase, self).setUp()
|
||||
opts._register(self.conf)
|
||||
self.requests_mock = self.useFixture(rm_fixture.Fixture())
|
||||
|
||||
def decode_post_data(self, post_data):
|
||||
result = {}
|
||||
for item in post_data.split('&'):
|
||||
key, _sep, value = item.partition('=')
|
||||
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
|
||||
return result
|
||||
|
||||
def test_accept(self):
|
||||
self.requests_mock.post('http://example.com/target', text='True')
|
||||
|
||||
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/x-www-form-urlencoded',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
||||
rule=None),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_accept_json(self):
|
||||
self.conf.set_override('remote_content_type', 'application/json',
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('http://example.com/target', text='True')
|
||||
|
||||
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/json',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
credentials=cred_dict,
|
||||
target=target_dict),
|
||||
json.loads(last_request.body.decode('utf-8')))
|
||||
|
||||
def test_reject(self):
|
||||
self.requests_mock.post("http://example.com/target", text='other')
|
||||
|
||||
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
||||
rule=None),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_http_with_objects_in_target(self):
|
||||
self.requests_mock.post("http://example.com/target", text='True')
|
||||
|
||||
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
||||
target = {'a': object(),
|
||||
'name': 'target',
|
||||
'b': 'test data'}
|
||||
self.assertTrue(check(target,
|
||||
dict(user='user', roles=['a', 'b', 'c']),
|
||||
self.enforcer))
|
||||
|
||||
def test_http_with_strings_in_target(self):
|
||||
self.requests_mock.post("http://example.com/target", text='True')
|
||||
|
||||
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
||||
target = {'a': 'some_string',
|
||||
'name': 'target',
|
||||
'b': 'test data'}
|
||||
self.assertTrue(check(target,
|
||||
dict(user='user', roles=['a', 'b', 'c']),
|
||||
self.enforcer))
|
||||
|
||||
def test_accept_with_rule_in_argument(self):
|
||||
self.requests_mock.post('http://example.com/target', text='True')
|
||||
|
||||
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
current_rule = "a_rule"
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer,
|
||||
current_rule))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
||||
rule=current_rule),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_reject_with_rule_in_argument(self):
|
||||
self.requests_mock.post("http://example.com/target", text='other')
|
||||
|
||||
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
current_rule = "a_rule"
|
||||
self.assertFalse(check(target_dict, cred_dict, self.enforcer,
|
||||
current_rule))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
||||
rule=current_rule),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
|
||||
class HttpsCheckTestCase(base.PolicyBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(HttpsCheckTestCase, self).setUp()
|
||||
opts._register(self.conf)
|
||||
self.requests_mock = self.useFixture(rm_fixture.Fixture())
|
||||
|
||||
def decode_post_data(self, post_data):
|
||||
result = {}
|
||||
for item in post_data.split('&'):
|
||||
key, _sep, value = item.partition('=')
|
||||
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
|
||||
return result
|
||||
|
||||
def test_https_accept(self):
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/x-www-form-urlencoded',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_accept_json(self):
|
||||
self.conf.set_override('remote_content_type', 'application/json',
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/json',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
json.loads(last_request.body.decode('utf-8')))
|
||||
|
||||
def test_https_accept_with_verify(self):
|
||||
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual(True, last_request.verify)
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_accept_with_verify_cert(self):
|
||||
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
with mock.patch('os.path.exists') as path_exists:
|
||||
path_exists.return_value = True
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('ca.crt', last_request.verify)
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_accept_with_verify_and_client_certs(self):
|
||||
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_client_key_file', "client.key",
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_client_crt_file', "client.crt",
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
with mock.patch('os.path.exists') as path_exists:
|
||||
with mock.patch('os.access') as os_access:
|
||||
path_exists.return_value = True
|
||||
os_access.return_value = True
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('ca.crt', last_request.verify)
|
||||
self.assertEqual(('client.crt', 'client.key'), last_request.cert)
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_reject(self):
|
||||
self.requests_mock.post("https://example.com/target", text='other')
|
||||
|
||||
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_with_objects_in_target(self):
|
||||
self.requests_mock.post("https://example.com/target", text='True')
|
||||
|
||||
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
||||
target = {'a': object(),
|
||||
'name': 'target',
|
||||
'b': 'test data'}
|
||||
self.assertTrue(check(target,
|
||||
dict(user='user', roles=['a', 'b', 'c']),
|
||||
self.enforcer))
|
||||
|
||||
def test_https_with_strings_in_target(self):
|
||||
self.requests_mock.post("https://example.com/target", text='True')
|
||||
|
||||
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
||||
target = {'a': 'some_string',
|
||||
'name': 'target',
|
||||
'b': 'test data'}
|
||||
self.assertTrue(check(target,
|
||||
dict(user='user', roles=['a', 'b', 'c']),
|
||||
self.enforcer))
|
@ -39,6 +39,10 @@ console_scripts =
|
||||
oslopolicy-policy-generator = oslo_policy.generator:generate_policy
|
||||
oslopolicy-list-redundant = oslo_policy.generator:list_redundant
|
||||
|
||||
oslo.policy.rule_checks =
|
||||
http = oslo_policy._external:HttpCheck
|
||||
https = oslo_policy._external:HttpsCheck
|
||||
|
||||
[build_sphinx]
|
||||
all-files = 1
|
||||
warning-is-error = 1
|
||||
|
Loading…
Reference in New Issue
Block a user