External Policy hook should support SSL
We have a HttpCheck that can check rules by calling a remote server. However currently it just supports "http://" and not "https://". In this patch, we add a HttpsCheck so we can support "https://" urls as well. In addition to ensure security, we add options to: * Specify a ca cert to validate the server * Specify cert/key to allow server to validate the client Another quirk was that currently the payload is being sent using 'application/x-www-form-urlencoded' as the content-type. We add another option to send the payload as 'application/json'. Add this same support to the HttpCheck as well. Change-Id: Ic9c5249555fb45010a92432a504c84ad3fbb9ea1
This commit is contained in:
parent
70ba1beb3e
commit
89d226916c
@ -20,11 +20,14 @@ import ast
|
||||
import contextlib
|
||||
import copy
|
||||
import inspect
|
||||
import os
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
import requests
|
||||
import six
|
||||
|
||||
from oslo_policy._i18n import _
|
||||
|
||||
|
||||
registered_checks = {}
|
||||
|
||||
@ -292,11 +295,86 @@ class HttpCheck(Check):
|
||||
element = target.get(key)
|
||||
if type(element) is object:
|
||||
temp_target[key] = {}
|
||||
data = {'rule': jsonutils.dumps(current_rule),
|
||||
'target': jsonutils.dumps(temp_target),
|
||||
'credentials': jsonutils.dumps(creds)}
|
||||
with contextlib.closing(requests.post(url, data=data)) as r:
|
||||
return r.text == 'True'
|
||||
|
||||
data = json = None
|
||||
if (enforcer.conf.oslo_policy.remote_content_type ==
|
||||
'application/x-www-form-urlencoded'):
|
||||
data = {'rule': jsonutils.dumps(current_rule),
|
||||
'target': jsonutils.dumps(temp_target),
|
||||
'credentials': jsonutils.dumps(creds)}
|
||||
else:
|
||||
json = {'rule': current_rule,
|
||||
'target': temp_target,
|
||||
'credentials': creds}
|
||||
|
||||
with contextlib.closing(requests.post(url, json=json, data=data)) as r:
|
||||
return r.text.lstrip('"').rstrip('"') == 'True'
|
||||
|
||||
|
||||
@register('https')
|
||||
class HttpsCheck(Check):
|
||||
"""Check ``https:`` rules by calling to a remote server.
|
||||
|
||||
This example implementation simply verifies that the response
|
||||
is exactly ``True``.
|
||||
"""
|
||||
|
||||
def __call__(self, target, creds, enforcer, current_rule=None):
|
||||
url = ('https:' + self.match) % target
|
||||
|
||||
cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file
|
||||
key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file
|
||||
ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file
|
||||
verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt
|
||||
|
||||
if cert_file:
|
||||
if not os.path.exists(cert_file):
|
||||
raise RuntimeError(
|
||||
_("Unable to find ssl cert_file : %s") % cert_file)
|
||||
if not os.access(cert_file, os.R_OK):
|
||||
raise RuntimeError(
|
||||
_("Unable to access ssl cert_file : %s") % cert_file)
|
||||
if key_file:
|
||||
if not os.path.exists(key_file):
|
||||
raise RuntimeError(
|
||||
_("Unable to find ssl key_file : %s") % key_file)
|
||||
if not os.access(key_file, os.R_OK):
|
||||
raise RuntimeError(
|
||||
_("Unable to access ssl key_file : %s") % key_file)
|
||||
cert = (cert_file, key_file)
|
||||
if verify_server:
|
||||
if ca_crt_file:
|
||||
if not os.path.exists(ca_crt_file):
|
||||
raise RuntimeError(
|
||||
_("Unable to find ca cert_file : %s") % ca_crt_file)
|
||||
verify_server = ca_crt_file
|
||||
|
||||
# Convert instances of object() in target temporarily to
|
||||
# empty dict to avoid circular reference detection
|
||||
# errors in jsonutils.dumps().
|
||||
temp_target = copy.deepcopy(target)
|
||||
for key in target.keys():
|
||||
element = target.get(key)
|
||||
if type(element) is object:
|
||||
temp_target[key] = {}
|
||||
|
||||
data = json = None
|
||||
if (enforcer.conf.oslo_policy.remote_content_type ==
|
||||
'application/x-www-form-urlencoded'):
|
||||
data = {'rule': jsonutils.dumps(current_rule),
|
||||
'target': jsonutils.dumps(temp_target),
|
||||
'credentials': jsonutils.dumps(creds)}
|
||||
else:
|
||||
json = {'rule': current_rule,
|
||||
'target': temp_target,
|
||||
'credentials': creds}
|
||||
|
||||
with contextlib.closing(
|
||||
requests.post(url, json=json,
|
||||
data=data, cert=cert,
|
||||
verify=verify_server)
|
||||
) as r:
|
||||
return r.text.lstrip('"').rstrip('"') == 'True'
|
||||
|
||||
|
||||
@register(None)
|
||||
|
@ -10,7 +10,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
__all__ = ['HttpCheckFixture']
|
||||
__all__ = ['HttpCheckFixture', 'HttpsCheckFixture']
|
||||
|
||||
import fixtures
|
||||
|
||||
@ -40,3 +40,30 @@ class HttpCheckFixture(fixtures.Fixture):
|
||||
mocked_call,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class HttpsCheckFixture(fixtures.Fixture):
|
||||
"""Helps short circuit the external http call"""
|
||||
|
||||
def __init__(self, return_value=True):
|
||||
"""Initialize the fixture.
|
||||
|
||||
:param return_value: True implies the policy check passed and False
|
||||
implies that the policy check failed
|
||||
:type return_value: boolean
|
||||
"""
|
||||
super(HttpsCheckFixture, self).__init__()
|
||||
self.return_value = return_value
|
||||
|
||||
def setUp(self):
|
||||
super(HttpsCheckFixture, self).setUp()
|
||||
|
||||
def mocked_call(target, cred, enforcer, rule):
|
||||
return self.return_value
|
||||
|
||||
self.useFixture(
|
||||
fixtures.MonkeyPatch(
|
||||
'oslo_policy._checks.HttpCheck.__call__',
|
||||
mocked_call,
|
||||
)
|
||||
)
|
||||
|
@ -44,6 +44,25 @@ _options = [
|
||||
'be searched. Missing or empty directories are '
|
||||
'ignored.'),
|
||||
deprecated_group='DEFAULT'),
|
||||
cfg.StrOpt('remote_content_type',
|
||||
choices=('application/x-www-form-urlencoded',
|
||||
'application/json'),
|
||||
default='application/x-www-form-urlencoded',
|
||||
help=_("Content Type to send and receive data for "
|
||||
"REST based policy check")),
|
||||
cfg.BoolOpt('remote_ssl_verify_server_crt',
|
||||
help=_("server identity verification for REST based "
|
||||
"policy check"),
|
||||
default=False),
|
||||
cfg.StrOpt('remote_ssl_ca_crt_file',
|
||||
help=_("Absolute path to ca cert file for REST based "
|
||||
"policy check")),
|
||||
cfg.StrOpt('remote_ssl_client_crt_file',
|
||||
help=_("Absolute path to client cert for REST based "
|
||||
"policy check")),
|
||||
cfg.StrOpt('remote_ssl_client_key_file',
|
||||
help=_("Absolute path client key file REST based "
|
||||
"policy check")),
|
||||
]
|
||||
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import mock
|
||||
from oslo_serialization import jsonutils
|
||||
from oslotest import base as test_base
|
||||
@ -20,6 +21,7 @@ from requests_mock.contrib import fixture as rm_fixture
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from oslo_policy import _checks
|
||||
from oslo_policy import opts
|
||||
from oslo_policy.tests import base
|
||||
from oslo_policy.tests import token_fixture
|
||||
|
||||
@ -98,6 +100,7 @@ class HttpCheckTestCase(base.PolicyBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(HttpCheckTestCase, self).setUp()
|
||||
opts._register(self.conf)
|
||||
self.requests_mock = self.useFixture(rm_fixture.Fixture())
|
||||
|
||||
def decode_post_data(self, post_data):
|
||||
@ -117,11 +120,33 @@ class HttpCheckTestCase(base.PolicyBaseTestCase):
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/x-www-form-urlencoded',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
||||
rule=None),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_accept_json(self):
|
||||
self.conf.set_override('remote_content_type', 'application/json',
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('http://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpCheck('http', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/json',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
credentials=cred_dict,
|
||||
target=target_dict),
|
||||
json.loads(last_request.body.decode('utf-8')))
|
||||
|
||||
def test_reject(self):
|
||||
self.requests_mock.post("http://example.com/target", text='other')
|
||||
|
||||
@ -194,6 +219,169 @@ class HttpCheckTestCase(base.PolicyBaseTestCase):
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
|
||||
class HttpsCheckTestCase(base.PolicyBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(HttpsCheckTestCase, self).setUp()
|
||||
opts._register(self.conf)
|
||||
self.requests_mock = self.useFixture(rm_fixture.Fixture())
|
||||
|
||||
def decode_post_data(self, post_data):
|
||||
result = {}
|
||||
for item in post_data.split('&'):
|
||||
key, _sep, value = item.partition('=')
|
||||
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
|
||||
return result
|
||||
|
||||
def test_https_accept(self):
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/x-www-form-urlencoded',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_accept_json(self):
|
||||
self.conf.set_override('remote_content_type', 'application/json',
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('application/json',
|
||||
last_request.headers['Content-Type'])
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
json.loads(last_request.body.decode('utf-8')))
|
||||
|
||||
def test_https_accept_with_verify(self):
|
||||
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual(True, last_request.verify)
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_accept_with_verify_cert(self):
|
||||
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
with mock.patch('os.path.exists') as path_exists:
|
||||
path_exists.return_value = True
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('ca.crt', last_request.verify)
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_accept_with_verify_and_client_certs(self):
|
||||
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_client_key_file', "client.key",
|
||||
group='oslo_policy')
|
||||
self.conf.set_override('remote_ssl_client_crt_file', "client.crt",
|
||||
group='oslo_policy')
|
||||
self.requests_mock.post('https://example.com/target', text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
with mock.patch('os.path.exists') as path_exists:
|
||||
with mock.patch('os.access') as os_access:
|
||||
path_exists.return_value = True
|
||||
os_access.return_value = True
|
||||
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('ca.crt', last_request.verify)
|
||||
self.assertEqual(('client.crt', 'client.key'), last_request.cert)
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_reject(self):
|
||||
self.requests_mock.post("https://example.com/target", text='other')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
|
||||
target_dict = dict(name='target', spam='spammer')
|
||||
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
||||
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
|
||||
|
||||
last_request = self.requests_mock.last_request
|
||||
self.assertEqual('POST', last_request.method)
|
||||
self.assertEqual(dict(rule=None,
|
||||
target=target_dict,
|
||||
credentials=cred_dict),
|
||||
self.decode_post_data(last_request.body))
|
||||
|
||||
def test_https_with_objects_in_target(self):
|
||||
self.requests_mock.post("https://example.com/target", text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
target = {'a': object(),
|
||||
'name': 'target',
|
||||
'b': 'test data'}
|
||||
self.assertTrue(check(target,
|
||||
dict(user='user', roles=['a', 'b', 'c']),
|
||||
self.enforcer))
|
||||
|
||||
def test_https_with_strings_in_target(self):
|
||||
self.requests_mock.post("https://example.com/target", text='True')
|
||||
|
||||
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
|
||||
target = {'a': 'some_string',
|
||||
'name': 'target',
|
||||
'b': 'test data'}
|
||||
self.assertTrue(check(target,
|
||||
dict(user='user', roles=['a', 'b', 'c']),
|
||||
self.enforcer))
|
||||
|
||||
|
||||
class GenericCheckTestCase(base.PolicyBaseTestCase):
|
||||
def test_no_cred(self):
|
||||
check = _checks.GenericCheck('name', '%(name)s')
|
||||
|
@ -35,3 +35,21 @@ class FixtureTestCase(test_base.PolicyBaseTestCase):
|
||||
return self.enforcer.enforce(rule=action,
|
||||
target={},
|
||||
creds={})
|
||||
|
||||
def test_enforce_https_true(self):
|
||||
self.assertTrue(self._test_enforce_http(True))
|
||||
|
||||
def test_enforce_https_false(self):
|
||||
self.assertFalse(self._test_enforce_http(False))
|
||||
|
||||
def _test_enforce_https(self, return_value):
|
||||
self.useFixture(fixture.HttpsCheckFixture(return_value=return_value))
|
||||
action = self.getUniqueString()
|
||||
rules_json = {
|
||||
action: "https:" + self.getUniqueString()
|
||||
}
|
||||
rules = oslo_policy.Rules.load(json.dumps(rules_json))
|
||||
self.enforcer.set_rules(rules)
|
||||
return self.enforcer.enforce(rule=action,
|
||||
target={},
|
||||
creds={})
|
||||
|
Loading…
x
Reference in New Issue
Block a user