py3: Fix formpost unicode filename issues
Previously, we took the native string filename attribute and put it directly in the (WSGI string) PATH_INFO field. Now, we convert it to a WSGI string first. Change-Id: I30e3beb8707b88c36bd3cdc7a0887d069e943ba6 Closes-Bug: #1858259
This commit is contained in:
parent
a4f1078864
commit
9483630ae1
@ -129,13 +129,14 @@ from time import time
|
||||
import six
|
||||
from six.moves.urllib.parse import quote
|
||||
|
||||
from swift.common.constraints import valid_api_version
|
||||
from swift.common.exceptions import MimeInvalid
|
||||
from swift.common.middleware.tempurl import get_tempurl_keys_from_metadata
|
||||
from swift.common.utils import streq_const_time, register_swift_info, \
|
||||
parse_content_disposition, parse_mime_headers, \
|
||||
iter_multipart_mime_documents, reiterate, close_if_possible
|
||||
from swift.common.wsgi import make_pre_authed_env
|
||||
from swift.common.swob import HTTPUnauthorized, wsgi_to_str
|
||||
from swift.common.swob import HTTPUnauthorized, wsgi_to_str, str_to_wsgi
|
||||
from swift.proxy.controllers.base import get_account_info, get_container_info
|
||||
|
||||
|
||||
@ -365,7 +366,8 @@ class FormPost(object):
|
||||
if not subenv['PATH_INFO'].endswith('/') and \
|
||||
subenv['PATH_INFO'].count('/') < 4:
|
||||
subenv['PATH_INFO'] += '/'
|
||||
subenv['PATH_INFO'] += attributes['filename'] or 'filename'
|
||||
subenv['PATH_INFO'] += str_to_wsgi(
|
||||
attributes['filename'] or 'filename')
|
||||
if 'x_delete_at' in attributes:
|
||||
try:
|
||||
subenv['HTTP_X_DELETE_AT'] = int(attributes['x_delete_at'])
|
||||
@ -442,8 +444,8 @@ class FormPost(object):
|
||||
:returns: list of tempurl keys
|
||||
"""
|
||||
parts = env['PATH_INFO'].split('/', 4)
|
||||
if len(parts) < 4 or parts[0] or parts[1] != 'v1' or not parts[2] or \
|
||||
not parts[3]:
|
||||
if len(parts) < 4 or parts[0] or not valid_api_version(parts[1]) \
|
||||
or not parts[2] or not parts[3]:
|
||||
return []
|
||||
|
||||
account_info = get_account_info(env, self.app, swift_source='FP')
|
||||
|
@ -21,7 +21,7 @@ from time import time
|
||||
import six
|
||||
from io import BytesIO
|
||||
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common.swob import Request, Response, wsgi_quote
|
||||
from swift.common.middleware import tempauth, formpost
|
||||
from swift.common.utils import split_path
|
||||
from swift.proxy.controllers.base import get_cache_key
|
||||
@ -49,6 +49,8 @@ class FakeApp(object):
|
||||
self.check_no_query_string = check_no_query_string
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
# use wsgi_quote to spot check that it really *is* a WSGI string
|
||||
wsgi_quote(env['PATH_INFO'])
|
||||
try:
|
||||
if self.check_no_query_string and env.get('QUERY_STRING'):
|
||||
raise Exception('Query string %s should have been discarded!' %
|
||||
@ -756,6 +758,126 @@ class TestFormPost(unittest.TestCase):
|
||||
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
|
||||
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
|
||||
|
||||
def test_curl_with_unicode(self):
|
||||
key = b'abc'
|
||||
path = u'/v1/AUTH_test/container/let_it_\N{SNOWFLAKE}/'
|
||||
if six.PY2:
|
||||
path = path.encode('utf-8')
|
||||
redirect = 'http://brim.net'
|
||||
max_file_size = 1024
|
||||
max_file_count = 10
|
||||
expires = int(time() + 86400)
|
||||
sig = hmac.new(
|
||||
key,
|
||||
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
|
||||
sha1).hexdigest()
|
||||
wsgi_input = '\r\n'.join([
|
||||
'--------------------------dea19ac8502ca805',
|
||||
'Content-Disposition: form-data; name="redirect"',
|
||||
'',
|
||||
redirect,
|
||||
'--------------------------dea19ac8502ca805',
|
||||
'Content-Disposition: form-data; name="max_file_size"',
|
||||
'',
|
||||
str(max_file_size),
|
||||
'--------------------------dea19ac8502ca805',
|
||||
'Content-Disposition: form-data; name="max_file_count"',
|
||||
'',
|
||||
str(max_file_count),
|
||||
'--------------------------dea19ac8502ca805',
|
||||
'Content-Disposition: form-data; name="expires"',
|
||||
'',
|
||||
str(expires),
|
||||
'--------------------------dea19ac8502ca805',
|
||||
'Content-Disposition: form-data; name="signature"',
|
||||
'',
|
||||
sig,
|
||||
'--------------------------dea19ac8502ca805',
|
||||
'Content-Disposition: form-data; name="file1"; '
|
||||
'filename="\xe2\x98\x83.txt"',
|
||||
'Content-Type: text/plain',
|
||||
'',
|
||||
'Test File\nOne\n',
|
||||
'--------------------------dea19ac8502ca805',
|
||||
'Content-Disposition: form-data; name="file2"; '
|
||||
'filename="testfile2.txt"',
|
||||
'Content-Type: text/plain',
|
||||
'',
|
||||
'Test\nFile\nTwo\n',
|
||||
'--------------------------dea19ac8502ca805',
|
||||
'Content-Disposition: form-data; name="file3"; filename=""',
|
||||
'Content-Type: application/octet-stream',
|
||||
'',
|
||||
'',
|
||||
'--------------------------dea19ac8502ca805--',
|
||||
''
|
||||
])
|
||||
if not six.PY2:
|
||||
wsgi_input = wsgi_input.encode('latin1')
|
||||
wsgi_input = BytesIO(wsgi_input)
|
||||
wsgi_errors = six.StringIO()
|
||||
env = {
|
||||
'CONTENT_LENGTH': str(len(wsgi_input.getvalue())),
|
||||
'CONTENT_TYPE': 'multipart/form-data; '
|
||||
'boundary=------------------------dea19ac8502ca805',
|
||||
'HTTP_ACCEPT': '*/*',
|
||||
'HTTP_HOST': 'ubuntu:8080',
|
||||
'HTTP_USER_AGENT': 'curl/7.58.0',
|
||||
'PATH_INFO': '/v1/AUTH_test/container/let_it_\xE2\x9D\x84/',
|
||||
'REMOTE_ADDR': '172.16.83.1',
|
||||
'REQUEST_METHOD': 'POST',
|
||||
'SCRIPT_NAME': '',
|
||||
'SERVER_NAME': '172.16.83.128',
|
||||
'SERVER_PORT': '8080',
|
||||
'SERVER_PROTOCOL': 'HTTP/1.0',
|
||||
'swift.infocache': {
|
||||
get_cache_key('AUTH_test'): self._fake_cache_env(
|
||||
'AUTH_test', [key]),
|
||||
get_cache_key('AUTH_test', 'container'): {
|
||||
'meta': {}}},
|
||||
'wsgi.errors': wsgi_errors,
|
||||
'wsgi.input': wsgi_input,
|
||||
'wsgi.multiprocess': False,
|
||||
'wsgi.multithread': True,
|
||||
'wsgi.run_once': False,
|
||||
'wsgi.url_scheme': 'http',
|
||||
'wsgi.version': (1, 0),
|
||||
}
|
||||
self.app = FakeApp(iter([('201 Created', {}, b''),
|
||||
('201 Created', {}, b'')]))
|
||||
self.auth = tempauth.filter_factory({})(self.app)
|
||||
self.formpost = formpost.filter_factory({})(self.auth)
|
||||
status = [None]
|
||||
headers = [None]
|
||||
exc_info = [None]
|
||||
|
||||
def start_response(s, h, e=None):
|
||||
status[0] = s
|
||||
headers[0] = h
|
||||
exc_info[0] = e
|
||||
|
||||
body = b''.join(self.formpost(env, start_response))
|
||||
status = status[0]
|
||||
headers = headers[0]
|
||||
exc_info = exc_info[0]
|
||||
self.assertEqual(status, '303 See Other', body)
|
||||
location = None
|
||||
for h, v in headers:
|
||||
if h.lower() == 'location':
|
||||
location = v
|
||||
self.assertEqual(location, 'http://brim.net?status=201&message=')
|
||||
self.assertIsNone(exc_info)
|
||||
self.assertTrue(b'http://brim.net?status=201&message=' in body)
|
||||
self.assertEqual(len(self.app.requests), 2)
|
||||
self.assertEqual(
|
||||
self.app.requests[0].path,
|
||||
'/v1/AUTH_test/container/let_it_%E2%9D%84/%E2%98%83.txt')
|
||||
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
|
||||
self.assertEqual(
|
||||
self.app.requests[1].path,
|
||||
'/v1/AUTH_test/container/let_it_%E2%9D%84/testfile2.txt')
|
||||
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
|
||||
|
||||
def test_messed_up_start(self):
|
||||
key = b'abc'
|
||||
sig, env, body = self._make_sig_env_body(
|
||||
|
Loading…
Reference in New Issue
Block a user