![Stephen Finucane](/assets/img/avatar_default.png)
Many of requests' APIs accept a 'verify' parameter which can be a boolean value or a path to either a CA cert bundle or a directory of CA certs. If 'verify=True' is set, requests will look for two environment variables, 'REQUESTS_CA_BUNDLE' and 'CURL_CA_BUNDLE', that, if set, should specify a path to CA certs. If either of these are found, 'requests' overrides the 'True' value with the value of the environment variable [1]. From the docs [2]: This list of trusted CAs can also be specified through the REQUESTS_CA_BUNDLE environment variable. If REQUESTS_CA_BUNDLE is not set, CURL_CA_BUNDLE will be used as fallback. This can cause test failures on environments where either of these are set. Ensure this doesn't happen by using the 'EnvironmentVariable' fixture to unset these environment variables. [1] https://github.com/psf/requests/blob/v2.25.0/requests/sessions.py#L717-L719 [2] https://requests.readthedocs.io/en/master/user/advanced/#ssl-cert-verification Change-Id: I808c9102b214aa25144e88e7773a9890ab0a5bdc Signed-off-by: Stephen Finucane <sfinucan@redhat.com>
320 lines
14 KiB
Python
320 lines
14 KiB
Python
# Copyright (c) 2015 OpenStack Foundation.
|
|
# All Rights Reserved.
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import json
|
|
from unittest import mock
|
|
|
|
import fixtures
|
|
from oslo_serialization import jsonutils
|
|
from requests_mock.contrib import fixture as rm_fixture
|
|
from urllib import parse as urlparse
|
|
|
|
from oslo_policy import _external
|
|
from oslo_policy import opts
|
|
from oslo_policy.tests import base
|
|
|
|
|
|
class HttpCheckTestCase(base.PolicyBaseTestCase):
|
|
|
|
def setUp(self):
|
|
super(HttpCheckTestCase, self).setUp()
|
|
opts._register(self.conf)
|
|
self.requests_mock = self.useFixture(rm_fixture.Fixture())
|
|
|
|
def decode_post_data(self, post_data):
|
|
result = {}
|
|
for item in post_data.split('&'):
|
|
key, _sep, value = item.partition('=')
|
|
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
|
|
return result
|
|
|
|
def test_accept(self):
|
|
self.requests_mock.post('http://example.com/target', text='True')
|
|
|
|
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual('application/x-www-form-urlencoded',
|
|
last_request.headers['Content-Type'])
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
|
rule=None),
|
|
self.decode_post_data(last_request.body))
|
|
|
|
def test_accept_json(self):
|
|
self.conf.set_override('remote_content_type', 'application/json',
|
|
group='oslo_policy')
|
|
self.requests_mock.post('http://example.com/target', text='True')
|
|
|
|
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual('application/json',
|
|
last_request.headers['Content-Type'])
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(rule=None,
|
|
credentials=cred_dict,
|
|
target=target_dict),
|
|
json.loads(last_request.body.decode('utf-8')))
|
|
|
|
def test_reject(self):
|
|
self.requests_mock.post("http://example.com/target", text='other')
|
|
|
|
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
|
rule=None),
|
|
self.decode_post_data(last_request.body))
|
|
|
|
def test_http_with_objects_in_target(self):
|
|
self.requests_mock.post("http://example.com/target", text='True')
|
|
|
|
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
|
target = {'a': object(),
|
|
'name': 'target',
|
|
'b': 'test data'}
|
|
self.assertTrue(check(target,
|
|
dict(user='user', roles=['a', 'b', 'c']),
|
|
self.enforcer))
|
|
|
|
def test_http_with_strings_in_target(self):
|
|
self.requests_mock.post("http://example.com/target", text='True')
|
|
|
|
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
|
target = {'a': 'some_string',
|
|
'name': 'target',
|
|
'b': 'test data'}
|
|
self.assertTrue(check(target,
|
|
dict(user='user', roles=['a', 'b', 'c']),
|
|
self.enforcer))
|
|
|
|
def test_accept_with_rule_in_argument(self):
|
|
self.requests_mock.post('http://example.com/target', text='True')
|
|
|
|
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
current_rule = "a_rule"
|
|
self.assertTrue(check(target_dict, cred_dict, self.enforcer,
|
|
current_rule))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
|
rule=current_rule),
|
|
self.decode_post_data(last_request.body))
|
|
|
|
def test_reject_with_rule_in_argument(self):
|
|
self.requests_mock.post("http://example.com/target", text='other')
|
|
|
|
check = _external.HttpCheck('http', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
current_rule = "a_rule"
|
|
self.assertFalse(check(target_dict, cred_dict, self.enforcer,
|
|
current_rule))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
|
|
rule=current_rule),
|
|
self.decode_post_data(last_request.body))
|
|
|
|
|
|
class HttpsCheckTestCase(base.PolicyBaseTestCase):
|
|
|
|
def setUp(self):
|
|
super(HttpsCheckTestCase, self).setUp()
|
|
opts._register(self.conf)
|
|
self.requests_mock = self.useFixture(rm_fixture.Fixture())
|
|
|
|
# ensure environment variables don't mess with our test results
|
|
# https://requests.readthedocs.io/en/master/user/advanced/#ssl-cert-verification
|
|
self.useFixture(fixtures.EnvironmentVariable('REQUESTS_CA_BUNDLE'))
|
|
self.useFixture(fixtures.EnvironmentVariable('CURL_CA_BUNDLE'))
|
|
|
|
def decode_post_data(self, post_data):
|
|
result = {}
|
|
for item in post_data.split('&'):
|
|
key, _sep, value = item.partition('=')
|
|
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
|
|
return result
|
|
|
|
def test_https_accept(self):
|
|
self.requests_mock.post('https://example.com/target', text='True')
|
|
|
|
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual('application/x-www-form-urlencoded',
|
|
last_request.headers['Content-Type'])
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(rule=None,
|
|
target=target_dict,
|
|
credentials=cred_dict),
|
|
self.decode_post_data(last_request.body))
|
|
|
|
def test_https_accept_json(self):
|
|
self.conf.set_override('remote_content_type', 'application/json',
|
|
group='oslo_policy')
|
|
self.requests_mock.post('https://example.com/target', text='True')
|
|
|
|
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual('application/json',
|
|
last_request.headers['Content-Type'])
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(rule=None,
|
|
target=target_dict,
|
|
credentials=cred_dict),
|
|
json.loads(last_request.body.decode('utf-8')))
|
|
|
|
def test_https_accept_with_verify(self):
|
|
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
|
group='oslo_policy')
|
|
self.conf.set_override('remote_ssl_ca_crt_file', None,
|
|
group='oslo_policy')
|
|
self.requests_mock.post('https://example.com/target', text='True')
|
|
|
|
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual(True, last_request.verify)
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(rule=None,
|
|
target=target_dict,
|
|
credentials=cred_dict),
|
|
self.decode_post_data(last_request.body))
|
|
|
|
def test_https_accept_with_verify_cert(self):
|
|
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
|
group='oslo_policy')
|
|
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
|
|
group='oslo_policy')
|
|
self.requests_mock.post('https://example.com/target', text='True')
|
|
|
|
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
with mock.patch('os.path.exists') as path_exists:
|
|
path_exists.return_value = True
|
|
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual('ca.crt', last_request.verify)
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(rule=None,
|
|
target=target_dict,
|
|
credentials=cred_dict),
|
|
self.decode_post_data(last_request.body))
|
|
|
|
def test_https_accept_with_verify_and_client_certs(self):
|
|
self.conf.set_override('remote_ssl_verify_server_crt', True,
|
|
group='oslo_policy')
|
|
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
|
|
group='oslo_policy')
|
|
self.conf.set_override('remote_ssl_client_key_file', "client.key",
|
|
group='oslo_policy')
|
|
self.conf.set_override('remote_ssl_client_crt_file', "client.crt",
|
|
group='oslo_policy')
|
|
self.requests_mock.post('https://example.com/target', text='True')
|
|
|
|
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
with mock.patch('os.path.exists') as path_exists:
|
|
with mock.patch('os.access') as os_access:
|
|
path_exists.return_value = True
|
|
os_access.return_value = True
|
|
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual('ca.crt', last_request.verify)
|
|
self.assertEqual(('client.crt', 'client.key'), last_request.cert)
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(rule=None,
|
|
target=target_dict,
|
|
credentials=cred_dict),
|
|
self.decode_post_data(last_request.body))
|
|
|
|
def test_https_reject(self):
|
|
self.requests_mock.post("https://example.com/target", text='other')
|
|
|
|
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
|
|
|
target_dict = dict(name='target', spam='spammer')
|
|
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
|
|
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
|
|
|
|
last_request = self.requests_mock.last_request
|
|
self.assertEqual('POST', last_request.method)
|
|
self.assertEqual(dict(rule=None,
|
|
target=target_dict,
|
|
credentials=cred_dict),
|
|
self.decode_post_data(last_request.body))
|
|
|
|
def test_https_with_objects_in_target(self):
|
|
self.requests_mock.post("https://example.com/target", text='True')
|
|
|
|
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
|
target = {'a': object(),
|
|
'name': 'target',
|
|
'b': 'test data'}
|
|
self.assertTrue(check(target,
|
|
dict(user='user', roles=['a', 'b', 'c']),
|
|
self.enforcer))
|
|
|
|
def test_https_with_strings_in_target(self):
|
|
self.requests_mock.post("https://example.com/target", text='True')
|
|
|
|
check = _external.HttpsCheck('https', '//example.com/%(name)s')
|
|
target = {'a': 'some_string',
|
|
'name': 'target',
|
|
'b': 'test data'}
|
|
self.assertTrue(check(target,
|
|
dict(user='user', roles=['a', 'b', 'c']),
|
|
self.enforcer))
|