oslo.policy/oslo_policy/_external.py
Davanum Srinivas 7e185ad96b http/https check rules as stevedore extensions
Why? HttpCheck/HttpsCheck are examples of rule checks
that can be implemented outside of the oslo.policy
library. Once we setup the infra for registering and
using these as stevedore extensions, we automatically
get the capability of other folks contributing to
writing custom rules for their own use cases.

* Add HttpCheck/HttpsCheck as entrypoints in setup.cfg
* parser will check get_extensions() to see if there
  are any external checks registered
* Move HttpCheck/HttpsCheck into external module
* Move related test cases to test_external.py

Change-Id: Icde2b26a38d7c7842defae053228d9208454b969
2017-10-05 08:40:47 -04:00

112 lines
4.2 KiB
Python

# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import copy
import os
from oslo_policy import _checks
from oslo_policy._i18n import _
from oslo_serialization import jsonutils
import requests
class HttpCheck(_checks.Check):
"""Check ``http:`` rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly ``True``.
"""
def __call__(self, target, creds, enforcer, current_rule=None):
url = ('http:' + self.match) % target
data, json = self._construct_payload(creds, current_rule,
enforcer, target)
with contextlib.closing(
requests.post(url, json=json, data=data)
) as r:
return r.text.lstrip('"').rstrip('"') == 'True'
@staticmethod
def _construct_payload(creds, current_rule, enforcer, target):
# Convert instances of object() in target temporarily to
# empty dict to avoid circular reference detection
# errors in jsonutils.dumps().
temp_target = copy.deepcopy(target)
for key in target.keys():
element = target.get(key)
if type(element) is object:
temp_target[key] = {}
data = json = None
if (enforcer.conf.oslo_policy.remote_content_type ==
'application/x-www-form-urlencoded'):
data = {'rule': jsonutils.dumps(current_rule),
'target': jsonutils.dumps(temp_target),
'credentials': jsonutils.dumps(creds)}
else:
json = {'rule': current_rule,
'target': temp_target,
'credentials': creds}
return data, json
class HttpsCheck(HttpCheck):
"""Check ``https:`` rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly ``True``.
"""
def __call__(self, target, creds, enforcer, current_rule=None):
url = ('https:' + self.match) % target
cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file
key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file
ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file
verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt
if cert_file:
if not os.path.exists(cert_file):
raise RuntimeError(
_("Unable to find ssl cert_file : %s") % cert_file)
if not os.access(cert_file, os.R_OK):
raise RuntimeError(
_("Unable to access ssl cert_file : %s") % cert_file)
if key_file:
if not os.path.exists(key_file):
raise RuntimeError(
_("Unable to find ssl key_file : %s") % key_file)
if not os.access(key_file, os.R_OK):
raise RuntimeError(
_("Unable to access ssl key_file : %s") % key_file)
cert = (cert_file, key_file)
if verify_server:
if ca_crt_file:
if not os.path.exists(ca_crt_file):
raise RuntimeError(
_("Unable to find ca cert_file : %s") % ca_crt_file)
verify_server = ca_crt_file
data, json = self._construct_payload(creds, current_rule,
enforcer, target)
with contextlib.closing(
requests.post(url, json=json,
data=data, cert=cert,
verify=verify_server)
) as r:
return r.text.lstrip('"').rstrip('"') == 'True'