Add s3token middleware to the swift3 project
This patch moves (as discussed at the Newton design summit) the s3_token middleware from keystonemiddleware to swift3. The git history is not included based upon the agreement between the Keystone team and the Swift3 team. This is based on s3_token.py from openstack/keystonemiddleware@234913e Note that the egg entrypoint has changed from "s3_token" to "s3token" for consistency between entrypoint and recommended pipeline names. Additionally, keystone functional tests now use the in-tree s3token middleware. Upgrade Impact ============== Deployers currently using keystone for authentication should change their s3token filter definition to use the middleware provided by swift3 rather than the one provided by keystonemiddleware. Note that keystonemiddleware will still need to be installed, and its auth_token middleware configured. UpgradeImpact Co-Authored-By: Tim Burke <tim.burke@gmail.com> Co-Authored-By: Kota Tsuyuzaki <tsuyuzaki.kota@lab.ntt.co.jp> Change-Id: I1c0e68a5276dd3dee97d7569e477c784db8ccb8a
This commit is contained in:
parent
ca0874307d
commit
b626a3ca86
@ -57,7 +57,7 @@ If you use keystone:
|
||||
pipeline = catch_errors cache swift3 s3token authtoken keystoneauth slo proxy-server
|
||||
|
||||
Note:
|
||||
* The s3token and authtoken filters require the keystonemiddleware package.
|
||||
* The authtoken filter requires the keystonemiddleware package.
|
||||
* Swift3 explicitly checks that keystoneauth is in the pipeline. You must use this name
|
||||
in the pipeline statement and in [filter:keystoneauth] section header.
|
||||
|
||||
@ -69,10 +69,8 @@ Note:
|
||||
You also need to add the following if you use keystone (adjust port, host, protocol configurations for your environment):
|
||||
|
||||
[filter:s3token]
|
||||
paste.filter_factory = keystonemiddleware.s3_token:filter_factory
|
||||
auth_port = 35357
|
||||
auth_host = 127.0.0.1
|
||||
auth_protocol = http
|
||||
use = egg:swift3#s3token
|
||||
auth_uri = http://127.0.0.1:35357/
|
||||
|
||||
|
||||
4) Swift3 config options:
|
||||
|
@ -141,15 +141,13 @@ use = egg:swift#memcache
|
||||
|
||||
[filter:s3token]
|
||||
# See swift manual for more details.
|
||||
paste.filter_factory = keystonemiddleware.s3_token:filter_factory
|
||||
use = egg:swift3#s3token
|
||||
|
||||
# Prefix that will be prepended to the tenant to form the account
|
||||
reseller_prefix = AUTH_
|
||||
|
||||
# Keystone server details
|
||||
auth_host = keystonehost
|
||||
auth_port = 35357
|
||||
auth_protocol = http
|
||||
auth_uri = http://keystonehost:35357/
|
||||
|
||||
# SSL-related options
|
||||
#insecure = False
|
||||
|
@ -1,2 +1,4 @@
|
||||
swift>=2.1.0
|
||||
lxml
|
||||
requests!=2.9.0,>=2.8.1 # Apache-2.0
|
||||
six>=1.9.0
|
||||
|
@ -29,6 +29,7 @@ packages =
|
||||
[entry_points]
|
||||
paste.filter_factory =
|
||||
swift3 = swift3.middleware:filter_factory
|
||||
s3token = swift3.s3_token_middleware:filter_factory
|
||||
|
||||
[nosetests]
|
||||
exe = 1
|
||||
|
228
swift3/s3_token_middleware.py
Normal file
228
swift3/s3_token_middleware.py
Normal file
@ -0,0 +1,228 @@
|
||||
# Copyright 2012 OpenStack Foundation
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2011,2012 Akira YOSHIYAMA <akirayoshiyama@gmail.com>
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# This source code is based ./auth_token.py and ./ec2_token.py.
|
||||
# See them for their copyright.
|
||||
|
||||
"""
|
||||
S3 Token Middleware
|
||||
|
||||
This WSGI component:
|
||||
|
||||
* Gets a request from the swift3 middleware with an S3 Authorization
|
||||
access key.
|
||||
* Validates s3 token in Keystone.
|
||||
* Transforms the account name to AUTH_%(tenant_name).
|
||||
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import requests
|
||||
import six
|
||||
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common.utils import config_true_value, split_path
|
||||
|
||||
|
||||
PROTOCOL_NAME = 'S3 Token Authentication'
|
||||
|
||||
|
||||
class ServiceError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class S3Token(object):
|
||||
"""Middleware that handles S3 authentication."""
|
||||
|
||||
def __init__(self, app, conf):
|
||||
"""Common initialization code."""
|
||||
self._app = app
|
||||
self._logger = logging.getLogger(conf.get('log_name', __name__))
|
||||
self._logger.debug('Starting the %s component', PROTOCOL_NAME)
|
||||
self._reseller_prefix = conf.get('reseller_prefix', 'AUTH_')
|
||||
# where to find the auth service (we use this to validate tokens)
|
||||
|
||||
self._request_uri = conf.get('auth_uri')
|
||||
if not self._request_uri:
|
||||
self._logger.warning(
|
||||
"Use of the auth_host, auth_port, and auth_protocol "
|
||||
"configuration options was deprecated in the Newton release "
|
||||
"in favor of auth_uri. These options may be removed in a "
|
||||
"future release.")
|
||||
auth_host = conf.get('auth_host')
|
||||
auth_port = int(conf.get('auth_port', 35357))
|
||||
auth_protocol = conf.get('auth_protocol', 'https')
|
||||
|
||||
self._request_uri = '%s://%s:%s' % (auth_protocol, auth_host,
|
||||
auth_port)
|
||||
self._request_uri = self._request_uri.rstrip('/')
|
||||
|
||||
# SSL
|
||||
insecure = config_true_value(conf.get('insecure'))
|
||||
cert_file = conf.get('certfile')
|
||||
key_file = conf.get('keyfile')
|
||||
|
||||
if insecure:
|
||||
self._verify = False
|
||||
elif cert_file and key_file:
|
||||
self._verify = (cert_file, key_file)
|
||||
elif cert_file:
|
||||
self._verify = cert_file
|
||||
else:
|
||||
self._verify = None
|
||||
|
||||
def _deny_request(self, code):
|
||||
error_table = {
|
||||
'AccessDenied': (401, 'Access denied'),
|
||||
'InvalidURI': (400, 'Could not parse the specified URI'),
|
||||
}
|
||||
resp = Response(content_type='text/xml')
|
||||
resp.status = error_table[code][0]
|
||||
error_msg = ('<?xml version="1.0" encoding="UTF-8"?>\r\n'
|
||||
'<Error>\r\n <Code>%s</Code>\r\n '
|
||||
'<Message>%s</Message>\r\n</Error>\r\n' %
|
||||
(code, error_table[code][1]))
|
||||
if six.PY3:
|
||||
error_msg = error_msg.encode()
|
||||
resp.body = error_msg
|
||||
return resp
|
||||
|
||||
def _json_request(self, creds_json):
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
try:
|
||||
response = requests.post('%s/v2.0/s3tokens' % self._request_uri,
|
||||
headers=headers, data=creds_json,
|
||||
verify=self._verify)
|
||||
except requests.exceptions.RequestException as e:
|
||||
self._logger.info('HTTP connection exception: %s', e)
|
||||
resp = self._deny_request('InvalidURI')
|
||||
raise ServiceError(resp)
|
||||
|
||||
if response.status_code < 200 or response.status_code >= 300:
|
||||
self._logger.debug('Keystone reply error: status=%s reason=%s',
|
||||
response.status_code, response.reason)
|
||||
resp = self._deny_request('AccessDenied')
|
||||
raise ServiceError(resp)
|
||||
|
||||
return response
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
"""Handle incoming request. authenticate and send downstream."""
|
||||
req = Request(environ)
|
||||
self._logger.debug('Calling S3Token middleware.')
|
||||
|
||||
try:
|
||||
parts = split_path(req.path, 1, 4, True)
|
||||
version, account, container, obj = parts
|
||||
except ValueError:
|
||||
msg = 'Not a path query, skipping.'
|
||||
self._logger.debug(msg)
|
||||
return self._app(environ, start_response)
|
||||
|
||||
# Read request signature and access id.
|
||||
if 'Authorization' not in req.headers:
|
||||
msg = 'No Authorization header. skipping.'
|
||||
self._logger.debug(msg)
|
||||
return self._app(environ, start_response)
|
||||
|
||||
token = req.headers.get('X-Auth-Token',
|
||||
req.headers.get('X-Storage-Token'))
|
||||
if not token:
|
||||
msg = 'You did not specify an auth or a storage token. skipping.'
|
||||
self._logger.debug(msg)
|
||||
return self._app(environ, start_response)
|
||||
|
||||
auth_header = req.headers['Authorization']
|
||||
try:
|
||||
access, signature = auth_header.split(' ')[-1].rsplit(':', 1)
|
||||
except ValueError:
|
||||
msg = 'You have an invalid Authorization header: %s'
|
||||
self._logger.debug(msg, auth_header)
|
||||
return self._deny_request('InvalidURI')(environ, start_response)
|
||||
|
||||
# NOTE(chmou): This is to handle the special case with nova
|
||||
# when we have the option s3_affix_tenant. We will force it to
|
||||
# connect to another account than the one
|
||||
# authenticated. Before people start getting worried about
|
||||
# security, I should point that we are connecting with
|
||||
# username/token specified by the user but instead of
|
||||
# connecting to its own account we will force it to go to an
|
||||
# another account. In a normal scenario if that user don't
|
||||
# have the reseller right it will just fail but since the
|
||||
# reseller account can connect to every account it is allowed
|
||||
# by the swift_auth middleware.
|
||||
force_tenant = None
|
||||
if ':' in access:
|
||||
access, force_tenant = access.split(':')
|
||||
|
||||
# Authenticate request.
|
||||
creds = {'credentials': {'access': access,
|
||||
'token': token,
|
||||
'signature': signature}}
|
||||
creds_json = json.dumps(creds)
|
||||
self._logger.debug('Connecting to Keystone sending this JSON: %s',
|
||||
creds_json)
|
||||
# NOTE(vish): We could save a call to keystone by having
|
||||
# keystone return token, tenant, user, and roles
|
||||
# from this call.
|
||||
#
|
||||
# NOTE(chmou): We still have the same problem we would need to
|
||||
# change token_auth to detect if we already
|
||||
# identified and not doing a second query and just
|
||||
# pass it through to swiftauth in this case.
|
||||
try:
|
||||
resp = self._json_request(creds_json)
|
||||
except ServiceError as e:
|
||||
resp = e.args[0] # NB: swob.Response, not requests.Response
|
||||
msg = 'Received error, exiting middleware with error: %s'
|
||||
self._logger.debug(msg, resp.status_int)
|
||||
return resp(environ, start_response)
|
||||
|
||||
self._logger.debug('Keystone Reply: Status: %d, Output: %s',
|
||||
resp.status_code, resp.content)
|
||||
|
||||
try:
|
||||
identity_info = resp.json()
|
||||
token_id = str(identity_info['access']['token']['id'])
|
||||
tenant = identity_info['access']['token']['tenant']
|
||||
except (ValueError, KeyError):
|
||||
error = 'Error on keystone reply: %d %s'
|
||||
self._logger.debug(error, resp.status_code, resp.content)
|
||||
return self._deny_request('InvalidURI')(environ, start_response)
|
||||
|
||||
req.headers['X-Auth-Token'] = token_id
|
||||
tenant_to_connect = force_tenant or tenant['id']
|
||||
if six.PY2 and isinstance(tenant_to_connect, six.text_type):
|
||||
tenant_to_connect = tenant_to_connect.encode('utf-8')
|
||||
self._logger.debug('Connecting with tenant: %s', tenant_to_connect)
|
||||
new_tenant_name = '%s%s' % (self._reseller_prefix, tenant_to_connect)
|
||||
environ['PATH_INFO'] = environ['PATH_INFO'].replace(account,
|
||||
new_tenant_name)
|
||||
return self._app(environ, start_response)
|
||||
|
||||
|
||||
def filter_factory(global_conf, **local_conf):
|
||||
"""Returns a WSGI filter app for use with paste.deploy."""
|
||||
conf = global_conf.copy()
|
||||
conf.update(local_conf)
|
||||
|
||||
def auth_filter(app):
|
||||
return S3Token(app, conf)
|
||||
return auth_filter
|
@ -47,11 +47,8 @@ use = egg:swift#gatekeeper
|
||||
use = egg:swift#memcache
|
||||
|
||||
[filter:s3token]
|
||||
paste.filter_factory = keystonemiddleware.s3_token:filter_factory
|
||||
auth_host = localhost
|
||||
auth_port = 35357
|
||||
auth_protocol = http
|
||||
auth_uri = http://localhost:5000/
|
||||
use = egg:swift3#s3token
|
||||
auth_uri = http://localhost:35357/
|
||||
|
||||
[filter:authtoken]
|
||||
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
|
||||
|
298
swift3/test/unit/test_s3_token_middleware.py
Normal file
298
swift3/test/unit/test_s3_token_middleware.py
Normal file
@ -0,0 +1,298 @@
|
||||
# Copyright 2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import unittest
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import requests
|
||||
from requests_mock.contrib import fixture as rm_fixture
|
||||
from six.moves import urllib
|
||||
|
||||
from swift3 import s3_token_middleware as s3_token
|
||||
from swift.common.swob import Request, Response
|
||||
|
||||
|
||||
GOOD_RESPONSE = {'access': {'token': {'id': 'TOKEN_ID',
|
||||
'tenant': {'id': 'TENANT_ID'}}}}
|
||||
|
||||
|
||||
class TestResponse(requests.Response):
|
||||
"""Utility class to wrap requests.Response.
|
||||
|
||||
Class used to wrap requests.Response and provide some convenience to
|
||||
initialize with a dict.
|
||||
"""
|
||||
|
||||
def __init__(self, data):
|
||||
self._text = None
|
||||
super(TestResponse, self).__init__()
|
||||
if isinstance(data, dict):
|
||||
self.status_code = data.get('status_code', 200)
|
||||
headers = data.get('headers')
|
||||
if headers:
|
||||
self.headers.update(headers)
|
||||
# Fake the text attribute to streamline Response creation
|
||||
# _content is defined by requests.Response
|
||||
self._content = data.get('text')
|
||||
else:
|
||||
self.status_code = data
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.__dict__ == other.__dict__
|
||||
|
||||
@property
|
||||
def text(self):
|
||||
return self.content
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
"""This represents a WSGI app protected by the auth_token middleware."""
|
||||
def __call__(self, env, start_response):
|
||||
resp = Response()
|
||||
resp.environ = env
|
||||
return resp(env, start_response)
|
||||
|
||||
|
||||
class S3TokenMiddlewareTestBase(unittest.TestCase):
|
||||
|
||||
TEST_AUTH_URI = 'https://fakehost/identity'
|
||||
TEST_URL = '%s/v2.0/s3tokens' % (TEST_AUTH_URI, )
|
||||
TEST_DOMAIN_ID = '1'
|
||||
TEST_DOMAIN_NAME = 'aDomain'
|
||||
TEST_GROUP_ID = uuid.uuid4().hex
|
||||
TEST_ROLE_ID = uuid.uuid4().hex
|
||||
TEST_TENANT_ID = '1'
|
||||
TEST_TENANT_NAME = 'aTenant'
|
||||
TEST_TOKEN = 'aToken'
|
||||
TEST_TRUST_ID = 'aTrust'
|
||||
TEST_USER = 'test'
|
||||
TEST_USER_ID = uuid.uuid4().hex
|
||||
|
||||
TEST_ROOT_URL = 'http://127.0.0.1:5000/'
|
||||
|
||||
def setUp(self):
|
||||
super(S3TokenMiddlewareTestBase, self).setUp()
|
||||
self.logger = fixtures.FakeLogger(level=logging.DEBUG)
|
||||
self.logger.setUp()
|
||||
self.time_patcher = mock.patch.object(time, 'time', lambda: 1234)
|
||||
self.time_patcher.start()
|
||||
|
||||
self.conf = {
|
||||
'auth_uri': self.TEST_AUTH_URI,
|
||||
}
|
||||
|
||||
self.requests_mock = rm_fixture.Fixture()
|
||||
self.requests_mock.setUp()
|
||||
|
||||
def tearDown(self):
|
||||
self.requests_mock.cleanUp()
|
||||
self.time_patcher.stop()
|
||||
self.logger.cleanUp()
|
||||
super(S3TokenMiddlewareTestBase, self).tearDown()
|
||||
|
||||
def start_fake_response(self, status, headers):
|
||||
self.response_status = int(status.split(' ', 1)[0])
|
||||
self.response_headers = dict(headers)
|
||||
|
||||
|
||||
class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(S3TokenMiddlewareTestGood, self).setUp()
|
||||
self.middleware = s3_token.S3Token(FakeApp(), self.conf)
|
||||
|
||||
self.requests_mock.post(self.TEST_URL,
|
||||
status_code=201,
|
||||
json=GOOD_RESPONSE)
|
||||
|
||||
# Ignore the request and pass to the next middleware in the
|
||||
# pipeline if no path has been specified.
|
||||
def test_no_path_request(self):
|
||||
req = Request.blank('/')
|
||||
self.middleware(req.environ, self.start_fake_response)
|
||||
self.assertEqual(self.response_status, 200)
|
||||
|
||||
# Ignore the request and pass to the next middleware in the
|
||||
# pipeline if no Authorization header has been specified
|
||||
def test_without_authorization(self):
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
self.middleware(req.environ, self.start_fake_response)
|
||||
self.assertEqual(self.response_status, 200)
|
||||
|
||||
def test_without_auth_storage_token(self):
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS badboy'
|
||||
self.middleware(req.environ, self.start_fake_response)
|
||||
self.assertEqual(self.response_status, 200)
|
||||
|
||||
def test_authorized(self):
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
req.get_response(self.middleware)
|
||||
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
|
||||
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
|
||||
|
||||
def test_authorized_http(self):
|
||||
protocol = 'http'
|
||||
host = 'fakehost'
|
||||
port = 35357
|
||||
self.requests_mock.post(
|
||||
'%s://%s:%s/v2.0/s3tokens' % (protocol, host, port),
|
||||
status_code=201, json=GOOD_RESPONSE)
|
||||
|
||||
self.middleware = (
|
||||
s3_token.filter_factory({'auth_protocol': 'http',
|
||||
'auth_host': host,
|
||||
'auth_port': port})(FakeApp()))
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
req.get_response(self.middleware)
|
||||
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
|
||||
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
|
||||
|
||||
def test_authorized_trailing_slash(self):
|
||||
self.middleware = s3_token.filter_factory({
|
||||
'auth_uri': self.TEST_AUTH_URI + '/'})(FakeApp())
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
req.get_response(self.middleware)
|
||||
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
|
||||
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
|
||||
|
||||
def test_authorization_nova_toconnect(self):
|
||||
req = Request.blank('/v1/AUTH_swiftint/c/o')
|
||||
req.headers['Authorization'] = 'AWS access:FORCED_TENANT_ID:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
req.get_response(self.middleware)
|
||||
path = req.environ['PATH_INFO']
|
||||
self.assertTrue(path.startswith('/v1/AUTH_FORCED_TENANT_ID'))
|
||||
|
||||
@mock.patch.object(requests, 'post')
|
||||
def test_insecure(self, MOCK_REQUEST):
|
||||
self.middleware = (
|
||||
s3_token.filter_factory({'insecure': 'True'})(FakeApp()))
|
||||
|
||||
text_return_value = json.dumps(GOOD_RESPONSE)
|
||||
MOCK_REQUEST.return_value = TestResponse({
|
||||
'status_code': 201,
|
||||
'text': text_return_value})
|
||||
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
req.get_response(self.middleware)
|
||||
|
||||
self.assertTrue(MOCK_REQUEST.called)
|
||||
mock_args, mock_kwargs = MOCK_REQUEST.call_args
|
||||
self.assertIs(mock_kwargs['verify'], False)
|
||||
|
||||
def test_insecure_option(self):
|
||||
# insecure is passed as a string.
|
||||
|
||||
# Some non-secure values.
|
||||
true_values = ['true', 'True', '1', 'yes']
|
||||
for val in true_values:
|
||||
config = {'insecure': val, 'certfile': 'false_ind'}
|
||||
middleware = s3_token.filter_factory(config)(FakeApp())
|
||||
self.assertIs(False, middleware._verify)
|
||||
|
||||
# Some "secure" values, including unexpected value.
|
||||
false_values = ['false', 'False', '0', 'no', 'someweirdvalue']
|
||||
for val in false_values:
|
||||
config = {'insecure': val, 'certfile': 'false_ind'}
|
||||
middleware = s3_token.filter_factory(config)(FakeApp())
|
||||
self.assertEqual('false_ind', middleware._verify)
|
||||
|
||||
# Default is secure.
|
||||
config = {'certfile': 'false_ind'}
|
||||
middleware = s3_token.filter_factory(config)(FakeApp())
|
||||
self.assertIs('false_ind', middleware._verify)
|
||||
|
||||
def test_unicode_path(self):
|
||||
url = u'/v1/AUTH_cfa/c/euro\u20ac'.encode('utf8')
|
||||
req = Request.blank(urllib.parse.quote(url))
|
||||
req.headers['Authorization'] = 'AWS access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
req.get_response(self.middleware)
|
||||
|
||||
|
||||
class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
|
||||
def setUp(self):
|
||||
super(S3TokenMiddlewareTestBad, self).setUp()
|
||||
self.middleware = s3_token.S3Token(FakeApp(), self.conf)
|
||||
|
||||
def test_unauthorized_token(self):
|
||||
ret = {"error":
|
||||
{"message": "EC2 access key not found.",
|
||||
"code": 401,
|
||||
"title": "Unauthorized"}}
|
||||
self.requests_mock.post(self.TEST_URL, status_code=403, json=ret)
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
resp = req.get_response(self.middleware)
|
||||
s3_denied_req = self.middleware._deny_request('AccessDenied')
|
||||
self.assertEqual(resp.body, s3_denied_req.body)
|
||||
self.assertEqual(
|
||||
resp.status_int, # pylint: disable-msg=E1101
|
||||
s3_denied_req.status_int) # pylint: disable-msg=E1101
|
||||
|
||||
def test_bogus_authorization(self):
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS badboy'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
resp = req.get_response(self.middleware)
|
||||
self.assertEqual(resp.status_int, 400) # pylint: disable-msg=E1101
|
||||
s3_invalid_req = self.middleware._deny_request('InvalidURI')
|
||||
self.assertEqual(resp.body, s3_invalid_req.body)
|
||||
self.assertEqual(
|
||||
resp.status_int, # pylint: disable-msg=E1101
|
||||
s3_invalid_req.status_int) # pylint: disable-msg=E1101
|
||||
|
||||
def test_fail_to_connect_to_keystone(self):
|
||||
with mock.patch.object(self.middleware, '_json_request') as o:
|
||||
s3_invalid_req = self.middleware._deny_request('InvalidURI')
|
||||
o.side_effect = s3_token.ServiceError(s3_invalid_req)
|
||||
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
resp = req.get_response(self.middleware)
|
||||
self.assertEqual(resp.body, s3_invalid_req.body)
|
||||
self.assertEqual(
|
||||
resp.status_int, # pylint: disable-msg=E1101
|
||||
s3_invalid_req.status_int) # pylint: disable-msg=E1101
|
||||
|
||||
def test_bad_reply(self):
|
||||
self.requests_mock.post(self.TEST_URL,
|
||||
status_code=201,
|
||||
text="<badreply>")
|
||||
|
||||
req = Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'AWS access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
resp = req.get_response(self.middleware)
|
||||
s3_invalid_req = self.middleware._deny_request('InvalidURI')
|
||||
self.assertEqual(resp.body, s3_invalid_req.body)
|
||||
self.assertEqual(
|
||||
resp.status_int, # pylint: disable-msg=E1101
|
||||
s3_invalid_req.status_int) # pylint: disable-msg=E1101
|
@ -7,4 +7,5 @@ mock
|
||||
pylint
|
||||
python-openstackclient<=1.9.0
|
||||
boto
|
||||
six>=1.9.0
|
||||
requests-mock>=0.7.0 # Apache-2.0
|
||||
fixtures<2.0,>=1.3.1 # Apache-2.0/BSD
|
||||
|
Loading…
x
Reference in New Issue
Block a user