swift/test/unit/common/middleware/test_formpost.py
Konrad Kügler f3f9184113 Individual content types for multi form upload
When uploading multiple files through the form upload middleware
it would take the first file's content type and encoding and wrongly
apply it to all the uploaded files.

This was because there was a shared 'attributes' dict for all body parts
which was (correctly) filled from all the non-file parts. The file parts
however would also set their 'Content-Type' and 'Content-Encoding'
header values in the attributes dict.

The content type/encoding handling made sure to not overwrite
existing values in the dict. Thus the first file to specify such a
header value would win.

Now the individual files get their own content types/encodings by
applying the file part content type/encoding headers only to a copy
of the dict.

The file parts' headers continue to be ignored if they are overridden
by explicit 'content-type'/'content-encoding' body parts.

Closes-Bug: #1916017
Change-Id: I0a921afc481ef5b98cd5974c45d4b1d3e2a15731
2021-05-24 21:21:24 +02:00

2104 lines
83 KiB
Python

# Copyright (c) 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hmac
import unittest
from hashlib import sha1
from time import time
import six
from io import BytesIO
from swift.common.swob import Request, Response, wsgi_quote
from swift.common.middleware import tempauth, formpost
from swift.common.utils import split_path
from swift.proxy.controllers.base import get_cache_key
def hmac_msg(path, redirect, max_file_size, max_file_count, expires):
msg = '%s\n%s\n%s\n%s\n%s' % (
path, redirect, max_file_size, max_file_count, expires)
if six.PY3:
msg = msg.encode('utf-8')
return msg
class FakeApp(object):
def __init__(self, status_headers_body_iter=None,
check_no_query_string=True):
self.status_headers_body_iter = status_headers_body_iter
if not self.status_headers_body_iter:
self.status_headers_body_iter = iter([('404 Not Found', {
'x-test-header-one-a': 'value1',
'x-test-header-two-a': 'value2',
'x-test-header-two-b': 'value3'}, b'')])
self.requests = []
self.check_no_query_string = check_no_query_string
def __call__(self, env, start_response):
# use wsgi_quote to spot check that it really *is* a WSGI string
wsgi_quote(env['PATH_INFO'])
try:
if self.check_no_query_string and env.get('QUERY_STRING'):
raise Exception('Query string %s should have been discarded!' %
env['QUERY_STRING'])
body = b''
while True:
chunk = env['wsgi.input'].read()
if not chunk:
break
body += chunk
env['wsgi.input'] = BytesIO(body)
self.requests.append(Request.blank('', environ=env))
if env.get('swift.authorize_override') and \
env.get('REMOTE_USER') != '.wsgi.pre_authed':
raise Exception(
'Invalid REMOTE_USER %r with swift.authorize_override' % (
env.get('REMOTE_USER'),))
if 'swift.authorize' in env:
resp = env['swift.authorize'](self.requests[-1])
if resp:
return resp(env, start_response)
status, headers, body = next(self.status_headers_body_iter)
return Response(status=status, headers=headers,
body=body)(env, start_response)
except EOFError:
start_response('499 Client Disconnect',
[('Content-Type', 'text/plain')])
return [b'Client Disconnect\n']
class TestCappedFileLikeObject(unittest.TestCase):
def test_whole(self):
self.assertEqual(
formpost._CappedFileLikeObject(BytesIO(b'abc'), 10).read(),
b'abc')
def test_exceeded(self):
exc = None
try:
formpost._CappedFileLikeObject(BytesIO(b'abc'), 2).read()
except EOFError as err:
exc = err
self.assertEqual(str(exc), 'max_file_size exceeded')
def test_whole_readline(self):
fp = formpost._CappedFileLikeObject(BytesIO(b'abc\ndef'), 10)
self.assertEqual(fp.readline(), b'abc\n')
self.assertEqual(fp.readline(), b'def')
self.assertEqual(fp.readline(), b'')
def test_exceeded_readline(self):
fp = formpost._CappedFileLikeObject(BytesIO(b'abc\ndef'), 5)
self.assertEqual(fp.readline(), b'abc\n')
exc = None
try:
self.assertEqual(fp.readline(), b'def')
except EOFError as err:
exc = err
self.assertEqual(str(exc), 'max_file_size exceeded')
def test_read_sized(self):
fp = formpost._CappedFileLikeObject(BytesIO(b'abcdefg'), 10)
self.assertEqual(fp.read(2), b'ab')
self.assertEqual(fp.read(2), b'cd')
self.assertEqual(fp.read(2), b'ef')
self.assertEqual(fp.read(2), b'g')
self.assertEqual(fp.read(2), b'')
class TestFormPost(unittest.TestCase):
def setUp(self):
self.app = FakeApp()
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
def _make_request(self, path, tempurl_keys=(), **kwargs):
req = Request.blank(path, **kwargs)
# Fake out the caching layer so that get_account_info() finds its
# data. Include something that isn't tempurl keys to prove we skip it.
meta = {'user-job-title': 'Personal Trainer',
'user-real-name': 'Jim Shortz'}
for idx, key in enumerate(tempurl_keys):
meta_name = 'temp-url-key' + (("-%d" % (idx + 1) if idx else ""))
if key:
meta[meta_name] = key
_junk, account, _junk, _junk = split_path(path, 2, 4)
req.environ.setdefault('swift.infocache', {})
req.environ['swift.infocache'][get_cache_key(account)] = \
self._fake_cache_env(account, tempurl_keys)
return req
def _fake_cache_env(self, account, tempurl_keys=()):
# Fake out the caching layer so that get_account_info() finds its
# data. Include something that isn't tempurl keys to prove we skip it.
meta = {'user-job-title': 'Personal Trainer',
'user-real-name': 'Jim Shortz'}
for idx, key in enumerate(tempurl_keys):
meta_name = 'temp-url-key' + ("-%d" % (idx + 1) if idx else "")
if key:
meta[meta_name] = key
return {'status': 204,
'container_count': '0',
'total_object_count': '0',
'bytes': '0',
'meta': meta}
def _make_sig_env_body(self, path, redirect, max_file_size, max_file_count,
expires, key, user_agent=True):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
body = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="redirect"',
'',
redirect,
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="max_file_size"',
'',
str(max_file_size),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="max_file_count"',
'',
str(max_file_count),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="expires"',
'',
str(expires),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="signature"',
'',
sig,
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file1"; '
'filename="testfile1.txt"',
'Content-Type: text/plain',
'',
'Test File\nOne\n',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file2"; '
'filename="testfile2.txt"',
'Content-Type: text/plain',
'Content-Encoding: gzip',
'',
'Test\nFile\nTwo\n',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file3"; filename=""',
'Content-Type: application/octet-stream',
'',
'',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR--',
'',
]
if six.PY3:
body = [line.encode('utf-8') for line in body]
wsgi_errors = six.StringIO()
env = {
'CONTENT_TYPE': 'multipart/form-data; '
'boundary=----WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'HTTP_ACCEPT_ENCODING': 'gzip, deflate',
'HTTP_ACCEPT_LANGUAGE': 'en-us',
'HTTP_ACCEPT': 'text/html,application/xhtml+xml,application/xml;'
'q=0.9,*/*;q=0.8',
'HTTP_CONNECTION': 'keep-alive',
'HTTP_HOST': 'ubuntu:8080',
'HTTP_ORIGIN': 'file://',
'HTTP_USER_AGENT': 'Mozilla/5.0 (Macintosh; Intel Mac OS X '
'10_7_2) AppleWebKit/534.52.7 (KHTML, like Gecko) '
'Version/5.1.2 Safari/534.52.7',
'PATH_INFO': path,
'REMOTE_ADDR': '172.16.83.1',
'REQUEST_METHOD': 'POST',
'SCRIPT_NAME': '',
'SERVER_NAME': '172.16.83.128',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'swift.infocache': {},
'wsgi.errors': wsgi_errors,
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': 'http',
'wsgi.version': (1, 0),
}
if user_agent is False:
del env['HTTP_USER_AGENT']
return sig, env, body
def test_passthrough(self):
for method in ('HEAD', 'GET', 'PUT', 'POST', 'DELETE'):
resp = self._make_request(
'/v1/a/c/o',
environ={'REQUEST_METHOD': method}).get_response(self.formpost)
self.assertEqual(resp.status_int, 401)
self.assertNotIn(b'FormPost', resp.body)
def test_auth_scheme(self):
# FormPost rejects
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() - 10), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '401 Unauthorized')
authenticate_v = None
for h, v in headers:
if h.lower() == 'www-authenticate':
authenticate_v = v
self.assertTrue(b'FormPost: Form Expired' in body)
self.assertEqual('Swift realm="unknown"', authenticate_v)
def test_safari(self):
key = b'abc'
path = '/v1/AUTH_test/container'
redirect = 'http://brim.net'
max_file_size = 1024
max_file_count = 10
expires = int(time() + 86400)
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
wsgi_input = '\r\n'.join([
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="redirect"',
'',
redirect,
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="max_file_size"',
'',
str(max_file_size),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="max_file_count"',
'',
str(max_file_count),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="expires"',
'',
str(expires),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="signature"',
'',
sig,
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file1"; '
'filename="testfile1.txt"',
'Content-Type: text/plain',
'',
'Test File\nOne\n',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file2"; '
'filename="testfile2.txt"',
'Content-Type: text/plain',
'',
'Test\nFile\nTwo\n',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file3"; filename=""',
'Content-Type: application/octet-stream',
'',
'',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR--',
'',
])
if six.PY3:
wsgi_input = wsgi_input.encode('utf-8')
wsgi_input = BytesIO(wsgi_input)
wsgi_errors = six.StringIO()
env = {
'CONTENT_TYPE': 'multipart/form-data; '
'boundary=----WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'HTTP_ACCEPT_ENCODING': 'gzip, deflate',
'HTTP_ACCEPT_LANGUAGE': 'en-us',
'HTTP_ACCEPT': 'text/html,application/xhtml+xml,application/xml;'
'q=0.9,*/*;q=0.8',
'HTTP_CONNECTION': 'keep-alive',
'HTTP_HOST': 'ubuntu:8080',
'HTTP_ORIGIN': 'file://',
'HTTP_USER_AGENT': 'Mozilla/5.0 (Macintosh; Intel Mac OS X '
'10_7_2) AppleWebKit/534.52.7 (KHTML, like Gecko) '
'Version/5.1.2 Safari/534.52.7',
'PATH_INFO': path,
'REMOTE_ADDR': '172.16.83.1',
'REQUEST_METHOD': 'POST',
'SCRIPT_NAME': '',
'SERVER_NAME': '172.16.83.128',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'swift.infocache': {
get_cache_key('AUTH_test'): self._fake_cache_env(
'AUTH_test', [key]),
get_cache_key('AUTH_test', 'container'): {
'meta': {}}},
'wsgi.errors': wsgi_errors,
'wsgi.input': wsgi_input,
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': 'http',
'wsgi.version': (1, 0),
}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(location, 'http://brim.net?status=201&message=')
self.assertIsNone(exc_info)
self.assertTrue(b'http://brim.net?status=201&message=' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_firefox(self):
key = b'abc'
path = '/v1/AUTH_test/container'
redirect = 'http://brim.net'
max_file_size = 1024
max_file_count = 10
expires = int(time() + 86400)
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
wsgi_input = '\r\n'.join([
'-----------------------------168072824752491622650073',
'Content-Disposition: form-data; name="redirect"',
'',
redirect,
'-----------------------------168072824752491622650073',
'Content-Disposition: form-data; name="max_file_size"',
'',
str(max_file_size),
'-----------------------------168072824752491622650073',
'Content-Disposition: form-data; name="max_file_count"',
'',
str(max_file_count),
'-----------------------------168072824752491622650073',
'Content-Disposition: form-data; name="expires"',
'',
str(expires),
'-----------------------------168072824752491622650073',
'Content-Disposition: form-data; name="signature"',
'',
sig,
'-----------------------------168072824752491622650073',
'Content-Disposition: form-data; name="file1"; '
'filename="testfile1.txt"',
'Content-Type: text/plain',
'',
'Test File\nOne\n',
'-----------------------------168072824752491622650073',
'Content-Disposition: form-data; name="file2"; '
'filename="testfile2.txt"',
'Content-Type: text/plain',
'',
'Test\nFile\nTwo\n',
'-----------------------------168072824752491622650073',
'Content-Disposition: form-data; name="file3"; filename=""',
'Content-Type: application/octet-stream',
'',
'',
'-----------------------------168072824752491622650073--',
''
])
if six.PY3:
wsgi_input = wsgi_input.encode('utf-8')
wsgi_input = BytesIO(wsgi_input)
wsgi_errors = six.StringIO()
env = {
'CONTENT_TYPE': 'multipart/form-data; '
'boundary=---------------------------168072824752491622650073',
'HTTP_ACCEPT_CHARSET': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
'HTTP_ACCEPT_ENCODING': 'gzip, deflate',
'HTTP_ACCEPT_LANGUAGE': 'en-us,en;q=0.5',
'HTTP_ACCEPT': 'text/html,application/xhtml+xml,application/xml;'
'q=0.9,*/*;q=0.8',
'HTTP_CONNECTION': 'keep-alive',
'HTTP_HOST': 'ubuntu:8080',
'HTTP_USER_AGENT': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; '
'rv:8.0.1) Gecko/20100101 Firefox/8.0.1',
'PATH_INFO': '/v1/AUTH_test/container',
'REMOTE_ADDR': '172.16.83.1',
'REQUEST_METHOD': 'POST',
'SCRIPT_NAME': '',
'SERVER_NAME': '172.16.83.128',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'swift.infocache': {
get_cache_key('AUTH_test'): self._fake_cache_env(
'AUTH_test', [key]),
get_cache_key('AUTH_test', 'container'): {
'meta': {}}},
'wsgi.errors': wsgi_errors,
'wsgi.input': wsgi_input,
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': 'http',
'wsgi.version': (1, 0),
}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(location, 'http://brim.net?status=201&message=')
self.assertIsNone(exc_info)
self.assertTrue(b'http://brim.net?status=201&message=' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_chrome(self):
key = b'abc'
path = '/v1/AUTH_test/container'
redirect = 'http://brim.net'
max_file_size = 1024
max_file_count = 10
expires = int(time() + 86400)
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
wsgi_input = '\r\n'.join([
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'Content-Disposition: form-data; name="redirect"',
'',
redirect,
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'Content-Disposition: form-data; name="max_file_size"',
'',
str(max_file_size),
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'Content-Disposition: form-data; name="max_file_count"',
'',
str(max_file_count),
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'Content-Disposition: form-data; name="expires"',
'',
str(expires),
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'Content-Disposition: form-data; name="signature"',
'',
sig,
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'Content-Disposition: form-data; name="file1"; '
'filename="testfile1.txt"',
'Content-Type: text/plain',
'',
'Test File\nOne\n',
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'Content-Disposition: form-data; name="file2"; '
'filename="testfile2.txt"',
'Content-Type: text/plain',
'',
'Test\nFile\nTwo\n',
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'Content-Disposition: form-data; name="file3"; filename=""',
'Content-Type: application/octet-stream',
'',
'',
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA--',
''
])
if six.PY3:
wsgi_input = wsgi_input.encode('utf-8')
wsgi_input = BytesIO(wsgi_input)
wsgi_errors = six.StringIO()
env = {
'CONTENT_TYPE': 'multipart/form-data; '
'boundary=----WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'HTTP_ACCEPT_CHARSET': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
'HTTP_ACCEPT_ENCODING': 'gzip,deflate,sdch',
'HTTP_ACCEPT_LANGUAGE': 'en-US,en;q=0.8',
'HTTP_ACCEPT': 'text/html,application/xhtml+xml,application/xml;'
'q=0.9,*/*;q=0.8',
'HTTP_CACHE_CONTROL': 'max-age=0',
'HTTP_CONNECTION': 'keep-alive',
'HTTP_HOST': 'ubuntu:8080',
'HTTP_ORIGIN': 'null',
'HTTP_USER_AGENT': 'Mozilla/5.0 (Macintosh; Intel Mac OS X '
'10_7_2) AppleWebKit/535.7 (KHTML, like Gecko) '
'Chrome/16.0.912.63 Safari/535.7',
'PATH_INFO': '/v1/AUTH_test/container',
'REMOTE_ADDR': '172.16.83.1',
'REQUEST_METHOD': 'POST',
'SCRIPT_NAME': '',
'SERVER_NAME': '172.16.83.128',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'swift.infocache': {
get_cache_key('AUTH_test'): self._fake_cache_env(
'AUTH_test', [key]),
get_cache_key('AUTH_test', 'container'): {
'meta': {}}},
'wsgi.errors': wsgi_errors,
'wsgi.input': wsgi_input,
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': 'http',
'wsgi.version': (1, 0),
}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(location, 'http://brim.net?status=201&message=')
self.assertIsNone(exc_info)
self.assertTrue(b'http://brim.net?status=201&message=' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_explorer(self):
key = b'abc'
path = '/v1/AUTH_test/container'
redirect = 'http://brim.net'
max_file_size = 1024
max_file_count = 10
expires = int(time() + 86400)
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
wsgi_input = '\r\n'.join([
'-----------------------------7db20d93017c',
'Content-Disposition: form-data; name="redirect"',
'',
redirect,
'-----------------------------7db20d93017c',
'Content-Disposition: form-data; name="max_file_size"',
'',
str(max_file_size),
'-----------------------------7db20d93017c',
'Content-Disposition: form-data; name="max_file_count"',
'',
str(max_file_count),
'-----------------------------7db20d93017c',
'Content-Disposition: form-data; name="expires"',
'',
str(expires),
'-----------------------------7db20d93017c',
'Content-Disposition: form-data; name="signature"',
'',
sig,
'-----------------------------7db20d93017c',
'Content-Disposition: form-data; name="file1"; '
'filename="C:\\testfile1.txt"',
'Content-Type: text/plain',
'',
'Test File\nOne\n',
'-----------------------------7db20d93017c',
'Content-Disposition: form-data; name="file2"; '
'filename="C:\\testfile2.txt"',
'Content-Type: text/plain',
'',
'Test\nFile\nTwo\n',
'-----------------------------7db20d93017c',
'Content-Disposition: form-data; name="file3"; filename=""',
'Content-Type: application/octet-stream',
'',
'',
'-----------------------------7db20d93017c--',
''
])
if six.PY3:
wsgi_input = wsgi_input.encode('utf-8')
wsgi_input = BytesIO(wsgi_input)
wsgi_errors = six.StringIO()
env = {
'CONTENT_TYPE': 'multipart/form-data; '
'boundary=---------------------------7db20d93017c',
'HTTP_ACCEPT_ENCODING': 'gzip, deflate',
'HTTP_ACCEPT_LANGUAGE': 'en-US',
'HTTP_ACCEPT': 'text/html, application/xhtml+xml, */*',
'HTTP_CACHE_CONTROL': 'no-cache',
'HTTP_CONNECTION': 'Keep-Alive',
'HTTP_HOST': '172.16.83.128:8080',
'HTTP_USER_AGENT': 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT '
'6.1; WOW64; Trident/5.0)',
'PATH_INFO': '/v1/AUTH_test/container',
'REMOTE_ADDR': '172.16.83.129',
'REQUEST_METHOD': 'POST',
'SCRIPT_NAME': '',
'SERVER_NAME': '172.16.83.128',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'swift.infocache': {
get_cache_key('AUTH_test'): self._fake_cache_env(
'AUTH_test', [key]),
get_cache_key('AUTH_test', 'container'): {
'meta': {}}},
'wsgi.errors': wsgi_errors,
'wsgi.input': wsgi_input,
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': 'http',
'wsgi.version': (1, 0),
}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(location, 'http://brim.net?status=201&message=')
self.assertIsNone(exc_info)
self.assertTrue(b'http://brim.net?status=201&message=' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_curl_with_unicode(self):
key = b'abc'
path = u'/v1/AUTH_test/container/let_it_\N{SNOWFLAKE}/'
if six.PY2:
path = path.encode('utf-8')
redirect = 'http://brim.net'
max_file_size = 1024
max_file_count = 10
expires = int(time() + 86400)
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
sha1).hexdigest()
wsgi_input = '\r\n'.join([
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="redirect"',
'',
redirect,
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="max_file_size"',
'',
str(max_file_size),
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="max_file_count"',
'',
str(max_file_count),
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="expires"',
'',
str(expires),
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="signature"',
'',
sig,
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="file1"; '
'filename="\xe2\x98\x83.txt"',
'Content-Type: text/plain',
'',
'Test File\nOne\n',
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="file2"; '
'filename="testfile2.txt"',
'Content-Type: text/plain',
'',
'Test\nFile\nTwo\n',
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="file3"; filename=""',
'Content-Type: application/octet-stream',
'',
'',
'--------------------------dea19ac8502ca805--',
''
])
if not six.PY2:
wsgi_input = wsgi_input.encode('latin1')
wsgi_input = BytesIO(wsgi_input)
wsgi_errors = six.StringIO()
env = {
'CONTENT_LENGTH': str(len(wsgi_input.getvalue())),
'CONTENT_TYPE': 'multipart/form-data; '
'boundary=------------------------dea19ac8502ca805',
'HTTP_ACCEPT': '*/*',
'HTTP_HOST': 'ubuntu:8080',
'HTTP_USER_AGENT': 'curl/7.58.0',
'PATH_INFO': '/v1/AUTH_test/container/let_it_\xE2\x9D\x84/',
'REMOTE_ADDR': '172.16.83.1',
'REQUEST_METHOD': 'POST',
'SCRIPT_NAME': '',
'SERVER_NAME': '172.16.83.128',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'swift.infocache': {
get_cache_key('AUTH_test'): self._fake_cache_env(
'AUTH_test', [key]),
get_cache_key('AUTH_test', 'container'): {
'meta': {}}},
'wsgi.errors': wsgi_errors,
'wsgi.input': wsgi_input,
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': 'http',
'wsgi.version': (1, 0),
}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other', body)
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(location, 'http://brim.net?status=201&message=')
self.assertIsNone(exc_info)
self.assertTrue(b'http://brim.net?status=201&message=' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(
self.app.requests[0].path,
'/v1/AUTH_test/container/let_it_%E2%9D%84/%E2%98%83.txt')
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(
self.app.requests[1].path,
'/v1/AUTH_test/container/let_it_%E2%9D%84/testfile2.txt')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_messed_up_start(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://brim.net', 5, 10,
int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'XX' + b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
def log_assert_int_status(env, response_status_int):
self.assertIsInstance(response_status_int, int)
self.formpost._log_request = log_assert_int_status
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '400 Bad Request')
self.assertIsNone(exc_info)
self.assertIn(b'FormPost: invalid starting boundary', body)
self.assertEqual(len(self.app.requests), 0)
def test_max_file_size_exceeded(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://brim.net', 5, 10,
int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '400 Bad Request')
self.assertIsNone(exc_info)
self.assertIn(b'FormPost: max_file_size exceeded', body)
self.assertEqual(len(self.app.requests), 0)
def test_max_file_count_exceeded(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://brim.net', 1024, 1,
int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(
location,
'http://brim.net?status=400&message=max%20file%20count%20exceeded')
self.assertIsNone(exc_info)
self.assertTrue(
b'http://brim.net?status=400&message=max%20file%20count%20exceeded'
in body)
self.assertEqual(len(self.app.requests), 1)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
def test_subrequest_does_not_pass_query(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
env['QUERY_STRING'] = 'this=should&not=get&passed'
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(
iter([('201 Created', {}, b''),
('201 Created', {}, b'')]),
check_no_query_string=True)
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
# Make sure we 201 Created, which means we made the final subrequest
# (and FakeApp verifies that no QUERY_STRING got passed).
self.assertEqual(status, '201 Created')
self.assertIsNone(exc_info)
self.assertTrue(b'201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
def test_subrequest_fails(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://brim.net', 1024, 10,
int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('404 Not Found', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(location, 'http://brim.net?status=404&message=')
self.assertIsNone(exc_info)
self.assertTrue(b'http://brim.net?status=404&message=' in body)
self.assertEqual(len(self.app.requests), 1)
def test_truncated_attr_value(self):
key = b'abc'
redirect = 'a' * formpost.MAX_VALUE_LENGTH
max_file_size = 1024
max_file_count = 10
expires = int(time() + 86400)
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', redirect, max_file_size, max_file_count,
expires, key)
# Tack on an extra char to redirect, but shouldn't matter since it
# should get truncated off on read.
redirect += 'b'
wsgi_input = '\r\n'.join([
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="redirect"',
'',
redirect,
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="max_file_size"',
'',
str(max_file_size),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="max_file_count"',
'',
str(max_file_count),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="expires"',
'',
str(expires),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="signature"',
'',
sig,
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file1"; '
'filename="testfile1.txt"',
'Content-Type: text/plain',
'',
'Test File\nOne\n',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file2"; '
'filename="testfile2.txt"',
'Content-Type: text/plain',
'',
'Test\nFile\nTwo\n',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file3"; filename=""',
'Content-Type: application/octet-stream',
'',
'',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR--',
'',
])
if six.PY3:
wsgi_input = wsgi_input.encode('utf-8')
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(
location,
('a' * formpost.MAX_VALUE_LENGTH) + '?status=201&message=')
self.assertIsNone(exc_info)
self.assertIn(
(b'a' * formpost.MAX_VALUE_LENGTH) + b'?status=201&message=', body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_no_file_to_process(self):
key = b'abc'
redirect = 'http://brim.net'
max_file_size = 1024
max_file_count = 10
expires = int(time() + 86400)
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', redirect, max_file_size, max_file_count,
expires, key)
wsgi_input = '\r\n'.join([
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="redirect"',
'',
redirect,
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="max_file_size"',
'',
str(max_file_size),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="max_file_count"',
'',
str(max_file_count),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="expires"',
'',
str(expires),
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="signature"',
'',
sig,
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR--',
'',
])
if six.PY3:
wsgi_input = wsgi_input.encode('utf-8')
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(
location,
'http://brim.net?status=400&message=no%20files%20to%20process')
self.assertIsNone(exc_info)
self.assertTrue(
b'http://brim.net?status=400&message=no%20files%20to%20process'
in body)
self.assertEqual(len(self.app.requests), 0)
def test_formpost_without_useragent(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://redirect', 1024, 10,
int(time() + 86400), key, user_agent=False)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
def start_response(s, h, e=None):
pass
body = b''.join(self.formpost(env, start_response))
self.assertIn('User-Agent', self.app.requests[0].headers)
self.assertEqual(self.app.requests[0].headers['User-Agent'],
'FormPost')
def test_formpost_with_origin(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://redirect', 1024, 10,
int(time() + 86400), key, user_agent=False)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
env['HTTP_ORIGIN'] = 'http://localhost:5000'
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created',
{'Access-Control-Allow-Origin':
'http://localhost:5000'}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
headers = {}
def start_response(s, h, e=None):
for k, v in h:
headers[k] = v
pass
body = b''.join(self.formpost(env, start_response))
self.assertEqual(headers['Access-Control-Allow-Origin'],
'http://localhost:5000')
def test_formpost_with_multiple_keys(self):
key = b'ernie'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://redirect', 1024, 10,
int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
# Stick it in X-Account-Meta-Temp-URL-Key-2 and make sure we get it
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
body = b''.join(self.formpost(env, start_response))
self.assertEqual('303 See Other', status[0])
self.assertEqual(
'http://redirect?status=201&message=',
dict(headers[0]).get('Location'))
def test_formpost_with_multiple_container_keys(self):
first_key = b'ernie'
second_key = b'bert'
keys = [first_key, second_key]
meta = {}
for idx, key in enumerate(keys):
meta_name = 'temp-url-key' + ("-%d" % (idx + 1) if idx else "")
if key:
meta[meta_name] = key
for key in keys:
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://redirect', 1024, 10,
int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test'))
# Stick it in X-Container-Meta-Temp-URL-Key-2 and ensure we get it
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': meta}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
body = b''.join(self.formpost(env, start_response))
self.assertEqual('303 See Other', status[0])
self.assertEqual(
'http://redirect?status=201&message=',
dict(headers[0]).get('Location'))
def test_redirect(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://redirect', 1024, 10,
int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(location, 'http://redirect?status=201&message=')
self.assertIsNone(exc_info)
self.assertTrue(location.encode('utf-8') in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_redirect_with_query(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', 'http://redirect?one=two', 1024, 10,
int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '303 See Other')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertEqual(location,
'http://redirect?one=two&status=201&message=')
self.assertIsNone(exc_info)
self.assertTrue(location.encode('utf-8') in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_no_redirect(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '201 Created')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
def test_no_redirect_expired(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() - 10), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '401 Unauthorized')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: Form Expired' in body)
def test_no_redirect_invalid_sig(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
# Change key to invalidate sig
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key + b' is bogus now']))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '401 Unauthorized')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: Invalid Signature' in body)
def test_no_redirect_with_error(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'XX' + b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '400 Bad Request')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: invalid starting boundary' in body)
def test_no_v1(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v2/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '401 Unauthorized')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: Invalid Signature' in body)
def test_empty_v1(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'//AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '401 Unauthorized')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: Invalid Signature' in body)
def test_empty_account(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1//container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '401 Unauthorized')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: Invalid Signature' in body)
def test_wrong_account(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_tst/container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([
('200 Ok', {'x-account-meta-temp-url-key': 'def'}, b''),
('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '401 Unauthorized')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: Invalid Signature' in body)
def test_no_container(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '401 Unauthorized')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: Invalid Signature' in body)
def test_completely_non_int_expires(self):
key = b'abc'
expires = int(time() + 86400)
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, expires, key)
for i, v in enumerate(body):
if v.decode('utf-8') == str(expires):
body[i] = b'badvalue'
break
env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '400 Bad Request')
location = None
for h, v in headers:
if h.lower() == 'location':
location = v
self.assertIsNone(location)
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: expired not an integer' in body)
def test_x_delete_at(self):
delete_at = int(time() + 100)
x_delete_body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="x_delete_at"',
'',
str(delete_at),
]
if six.PY3:
x_delete_body_part = [line.encode('utf-8')
for line in x_delete_body_part]
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
wsgi_input = b'\r\n'.join(x_delete_body_part + body)
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '201 Created')
self.assertTrue(b'201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertIn("X-Delete-At", self.app.requests[0].headers)
self.assertIn("X-Delete-At", self.app.requests[1].headers)
self.assertEqual(delete_at,
self.app.requests[0].headers["X-Delete-At"])
self.assertEqual(delete_at,
self.app.requests[1].headers["X-Delete-At"])
def test_x_delete_at_not_int(self):
delete_at = "2014-07-16"
x_delete_body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="x_delete_at"',
'',
str(delete_at),
]
if six.PY3:
x_delete_body_part = [line.encode('utf-8')
for line in x_delete_body_part]
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
wsgi_input = b'\r\n'.join(x_delete_body_part + body)
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '400 Bad Request')
self.assertTrue(b'FormPost: x_delete_at not an integer' in body)
def test_x_delete_after(self):
delete_after = 100
x_delete_body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="x_delete_after"',
'',
str(delete_after),
]
if six.PY3:
x_delete_body_part = [line.encode('utf-8')
for line in x_delete_body_part]
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
wsgi_input = b'\r\n'.join(x_delete_body_part + body)
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '201 Created')
self.assertTrue(b'201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertIn("X-Delete-After", self.app.requests[0].headers)
self.assertIn("X-Delete-After", self.app.requests[1].headers)
self.assertEqual(delete_after,
self.app.requests[0].headers["X-Delete-After"])
self.assertEqual(delete_after,
self.app.requests[1].headers["X-Delete-After"])
def test_x_delete_after_not_int(self):
delete_after = "2 days"
x_delete_body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="x_delete_after"',
'',
str(delete_after),
]
if six.PY3:
x_delete_body_part = [line.encode('utf-8')
for line in x_delete_body_part]
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
wsgi_input = b'\r\n'.join(x_delete_body_part + body)
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '400 Bad Request')
self.assertTrue(b'FormPost: x_delete_after not an integer' in body)
def test_global_content_type_encoding(self):
body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="content-encoding"',
'',
'gzip',
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="content-type"',
'',
'text/html',
]
if six.PY3:
body_part = [line.encode('utf-8') for line in body_part]
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
wsgi_input = b'\r\n'.join(body_part + body)
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '201 Created')
self.assertTrue(b'201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertIn("Content-Type", self.app.requests[0].headers)
self.assertIn("Content-Type", self.app.requests[1].headers)
self.assertIn("Content-Encoding", self.app.requests[0].headers)
self.assertIn("Content-Encoding", self.app.requests[1].headers)
self.assertEqual("text/html",
self.app.requests[0].headers["Content-Type"])
self.assertEqual("text/html",
self.app.requests[1].headers["Content-Type"])
self.assertEqual("gzip",
self.app.requests[0].headers["Content-Encoding"])
self.assertEqual("gzip",
self.app.requests[1].headers["Content-Encoding"])
def test_single_content_type_encoding(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
wsgi_input = b'\r\n'.join(body)
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b''),
('201 Created', {}, b'')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEqual(status, '201 Created')
self.assertTrue(b'201 Created' in body)
self.assertEqual(len(self.app.requests), 2)
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
self.assertIn("Content-Type", self.app.requests[0].headers)
self.assertIn("Content-Type", self.app.requests[1].headers)
self.assertEqual("text/plain",
self.app.requests[0].headers["Content-Type"])
self.assertEqual("text/plain",
self.app.requests[1].headers["Content-Type"])
self.assertFalse("Content-Encoding" in self.app.requests[0].headers)
self.assertIn("Content-Encoding", self.app.requests[1].headers)
self.assertEqual("gzip",
self.app.requests[1].headers["Content-Encoding"])
def test_multiple_content_type_encoding(self):
body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file4"; '
'filename="testfile4.txt"',
'Content-Type: application/json',
'',
'{"four": 4}\n',
]
if six.PY3:
body_part = [line.encode('utf-8') for line in body_part]
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
# splice in another file with a different content type
before_closing_boundary = len(body) - 2
body[before_closing_boundary:before_closing_boundary] = body_part
wsgi_input = b'\r\n'.join(body)
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b'')] * 3))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
self.assertEqual(status, '201 Created')
self.assertTrue(b'201 Created' in body)
self.assertEqual(len(self.app.requests), 3)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
self.assertEqual(self.app.requests[2].body, b'{"four": 4}\n')
self.assertIn("Content-Type", self.app.requests[0].headers)
self.assertIn("Content-Type", self.app.requests[1].headers)
self.assertIn("Content-Type", self.app.requests[2].headers)
self.assertEqual("text/plain",
self.app.requests[0].headers["Content-Type"])
self.assertEqual("text/plain",
self.app.requests[1].headers["Content-Type"])
self.assertEqual("application/json",
self.app.requests[2].headers["Content-Type"])
self.assertFalse("Content-Encoding" in self.app.requests[0].headers)
self.assertIn("Content-Encoding", self.app.requests[1].headers)
self.assertEqual("gzip",
self.app.requests[1].headers["Content-Encoding"])
self.assertFalse("Content-Encoding" in self.app.requests[2].headers)
if __name__ == '__main__':
unittest.main()