py3: slo
This adds wsgi_to_str(self.path_info) everywhere we forgot it, not only in the slo module itself. Dropping the body=''.join(body) after call_slo() is obvious: the latter only returns strings of bytes, not lists of such. Change-Id: I6b4d87e4cda4945bc128dbc9c1edd39e736a59d2
This commit is contained in:
parent
f1a0eccab9
commit
bd8c3067b4
@ -42,7 +42,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, \
|
||||
HTTPCreated, HTTPForbidden, HTTPInternalServerError, \
|
||||
HTTPMethodNotAllowed, HTTPNoContent, HTTPNotFound, \
|
||||
HTTPPreconditionFailed, HTTPConflict, Request, \
|
||||
HTTPInsufficientStorage, HTTPException
|
||||
HTTPInsufficientStorage, HTTPException, wsgi_to_str
|
||||
from swift.common.request_helpers import is_sys_or_user_meta
|
||||
|
||||
|
||||
@ -299,7 +299,7 @@ class AccountController(BaseStorageServer):
|
||||
start_time = time.time()
|
||||
req = Request(env)
|
||||
self.logger.txn_id = req.headers.get('x-trans-id', None)
|
||||
if not check_utf8(req.path_info):
|
||||
if not check_utf8(wsgi_to_str(req.path_info)):
|
||||
res = HTTPPreconditionFailed(body='Invalid UTF8 or contains NULL')
|
||||
else:
|
||||
try:
|
||||
|
@ -319,9 +319,11 @@ from datetime import datetime
|
||||
import json
|
||||
import mimetypes
|
||||
import re
|
||||
import six
|
||||
import time
|
||||
from hashlib import md5
|
||||
|
||||
import six
|
||||
|
||||
from swift.common.exceptions import ListingIterError, SegmentError
|
||||
from swift.common.middleware.listing_formats import \
|
||||
MAX_CONTAINER_LISTING_CONTENT_LENGTH
|
||||
@ -329,7 +331,7 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
|
||||
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \
|
||||
HTTPOk, HTTPPreconditionFailed, HTTPException, HTTPNotFound, \
|
||||
HTTPUnauthorized, HTTPConflict, HTTPUnprocessableEntity, Response, Range, \
|
||||
RESPONSE_REASONS
|
||||
RESPONSE_REASONS, str_to_wsgi, wsgi_to_str, wsgi_quote
|
||||
from swift.common.utils import get_logger, config_true_value, \
|
||||
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
|
||||
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
|
||||
@ -498,7 +500,10 @@ def parse_and_validate_input(req_body, req_path):
|
||||
% (seg_index,))
|
||||
continue
|
||||
# re-encode to normalize padding
|
||||
seg_dict['data'] = base64.b64encode(data)
|
||||
if six.PY2:
|
||||
seg_dict['data'] = base64.b64encode(data)
|
||||
else:
|
||||
seg_dict['data'] = base64.b64encode(data).decode('ascii')
|
||||
|
||||
if parsed_data and all('data' in d for d in parsed_data):
|
||||
errors.append(b"Inline data segments require at least one "
|
||||
@ -524,9 +529,18 @@ class SloGetContext(WSGIContext):
|
||||
"""
|
||||
Fetch the submanifest, parse it, and return it.
|
||||
Raise exception on failures.
|
||||
|
||||
:param req: the upstream request
|
||||
:param version: whatever
|
||||
:param acc: native
|
||||
:param con: native
|
||||
:param obj: native
|
||||
"""
|
||||
sub_req = make_subrequest(
|
||||
req.environ, path=quote('/'.join(['', version, acc, con, obj])),
|
||||
req.environ,
|
||||
path=wsgi_quote('/'.join([
|
||||
'', str_to_wsgi(version),
|
||||
str_to_wsgi(acc), str_to_wsgi(con), str_to_wsgi(obj)])),
|
||||
method='GET',
|
||||
headers={'x-auth-token': req.headers.get('x-auth-token')},
|
||||
agent='%(orig)s SLO MultipartGET', swift_source='SLO')
|
||||
@ -541,7 +555,7 @@ class SloGetContext(WSGIContext):
|
||||
|
||||
try:
|
||||
with closing_if_possible(sub_resp.app_iter):
|
||||
return json.loads(''.join(sub_resp.app_iter))
|
||||
return json.loads(b''.join(sub_resp.app_iter))
|
||||
except ValueError as err:
|
||||
raise ListingIterError(
|
||||
'while fetching %s, JSON-decoding of submanifest %s '
|
||||
@ -656,7 +670,10 @@ class SloGetContext(WSGIContext):
|
||||
"While processing manifest %r, "
|
||||
"max recursion depth was exceeded" % req.path)
|
||||
|
||||
sub_path = get_valid_utf8_str(seg_dict['name'])
|
||||
if six.PY2:
|
||||
sub_path = get_valid_utf8_str(seg_dict['name'])
|
||||
else:
|
||||
sub_path = seg_dict['name']
|
||||
sub_cont, sub_obj = split_path(sub_path, 2, 2, True)
|
||||
if last_sub_path != sub_path:
|
||||
sub_segments = cached_fetch_sub_slo_segments(
|
||||
@ -675,7 +692,7 @@ class SloGetContext(WSGIContext):
|
||||
recursion_depth=recursion_depth + 1):
|
||||
yield sub_seg_dict
|
||||
else:
|
||||
if isinstance(seg_dict['name'], six.text_type):
|
||||
if six.PY2 and isinstance(seg_dict['name'], six.text_type):
|
||||
seg_dict['name'] = seg_dict['name'].encode("utf-8")
|
||||
yield dict(seg_dict,
|
||||
first_byte=max(0, first_byte) + range_start,
|
||||
@ -865,7 +882,7 @@ class SloGetContext(WSGIContext):
|
||||
|
||||
def _get_manifest_read(self, resp_iter):
|
||||
with closing_if_possible(resp_iter):
|
||||
resp_body = ''.join(resp_iter)
|
||||
resp_body = b''.join(resp_iter)
|
||||
try:
|
||||
segments = json.loads(resp_body)
|
||||
except ValueError:
|
||||
@ -904,13 +921,12 @@ class SloGetContext(WSGIContext):
|
||||
|
||||
if slo_etag is None:
|
||||
if 'raw_data' in seg_dict:
|
||||
calculated_etag.update(
|
||||
md5(seg_dict['raw_data']).hexdigest())
|
||||
r = md5(seg_dict['raw_data']).hexdigest()
|
||||
elif seg_dict.get('range'):
|
||||
calculated_etag.update(
|
||||
'%s:%s;' % (seg_dict['hash'], seg_dict['range']))
|
||||
r = '%s:%s;' % (seg_dict['hash'], seg_dict['range'])
|
||||
else:
|
||||
calculated_etag.update(seg_dict['hash'])
|
||||
r = seg_dict['hash']
|
||||
calculated_etag.update(r.encode('ascii') if six.PY3 else r)
|
||||
|
||||
if content_length is None:
|
||||
if config_true_value(seg_dict.get('sub_slo')):
|
||||
@ -934,7 +950,7 @@ class SloGetContext(WSGIContext):
|
||||
|
||||
def _manifest_head_response(self, req, response_headers):
|
||||
conditional_etag = resolve_etag_is_at_header(req, response_headers)
|
||||
return HTTPOk(request=req, headers=response_headers, body='',
|
||||
return HTTPOk(request=req, headers=response_headers, body=b'',
|
||||
conditional_etag=conditional_etag,
|
||||
conditional_response=True)
|
||||
|
||||
@ -950,6 +966,7 @@ class SloGetContext(WSGIContext):
|
||||
byteranges = []
|
||||
|
||||
ver, account, _junk = req.split_path(3, 3, rest_with_last=True)
|
||||
account = wsgi_to_str(account)
|
||||
plain_listing_iter = self._segment_listing_iterator(
|
||||
req, ver, account, segments, byteranges)
|
||||
|
||||
@ -1073,18 +1090,19 @@ class StaticLargeObject(object):
|
||||
:raises HttpException: on errors
|
||||
"""
|
||||
vrs, account, container, obj = req.split_path(4, rest_with_last=True)
|
||||
if req.content_length > self.max_manifest_size:
|
||||
raise HTTPRequestEntityTooLarge(
|
||||
"Manifest File > %d bytes" % self.max_manifest_size)
|
||||
if req.headers.get('X-Copy-From'):
|
||||
raise HTTPMethodNotAllowed(
|
||||
'Multipart Manifest PUTs cannot be COPY requests')
|
||||
if req.content_length is None and \
|
||||
req.headers.get('transfer-encoding', '').lower() != 'chunked':
|
||||
raise HTTPLengthRequired(request=req)
|
||||
if req.content_length is None:
|
||||
if req.headers.get('transfer-encoding', '').lower() != 'chunked':
|
||||
raise HTTPLengthRequired(request=req)
|
||||
else:
|
||||
if req.content_length > self.max_manifest_size:
|
||||
raise HTTPRequestEntityTooLarge(
|
||||
"Manifest File > %d bytes" % self.max_manifest_size)
|
||||
parsed_data = parse_and_validate_input(
|
||||
req.body_file.read(self.max_manifest_size),
|
||||
req.path)
|
||||
wsgi_to_str(req.path))
|
||||
problem_segments = []
|
||||
|
||||
object_segments = [seg for seg in parsed_data if 'path' in seg]
|
||||
@ -1109,8 +1127,13 @@ class StaticLargeObject(object):
|
||||
path2indices[seg_dict['path']].append(index)
|
||||
|
||||
def do_head(obj_name):
|
||||
obj_path = quote('/'.join([
|
||||
'', vrs, account, get_valid_utf8_str(obj_name).lstrip('/')]))
|
||||
if six.PY2:
|
||||
obj_path = '/'.join(['', vrs, account,
|
||||
get_valid_utf8_str(obj_name).lstrip('/')])
|
||||
else:
|
||||
obj_path = '/'.join(['', vrs, account,
|
||||
str_to_wsgi(obj_name.lstrip('/'))])
|
||||
obj_path = wsgi_quote(obj_path)
|
||||
|
||||
sub_req = make_subrequest(
|
||||
req.environ, path=obj_path + '?', # kill the query string
|
||||
@ -1194,7 +1217,7 @@ class StaticLargeObject(object):
|
||||
return segment_length, seg_data
|
||||
|
||||
heartbeat = config_true_value(req.params.get('heartbeat'))
|
||||
separator = ''
|
||||
separator = b''
|
||||
if heartbeat:
|
||||
# Apparently some ways of deploying require that this to happens
|
||||
# *before* the return? Not sure why.
|
||||
@ -1202,13 +1225,13 @@ class StaticLargeObject(object):
|
||||
start_response('202 Accepted', [ # NB: not 201 !
|
||||
('Content-Type', out_content_type),
|
||||
])
|
||||
separator = '\r\n\r\n'
|
||||
separator = b'\r\n\r\n'
|
||||
|
||||
def resp_iter(total_size=total_size):
|
||||
# wsgi won't propagate start_response calls until some data has
|
||||
# been yielded so make sure first heartbeat is sent immediately
|
||||
if heartbeat:
|
||||
yield ' '
|
||||
yield b' '
|
||||
last_yield_time = time.time()
|
||||
with StreamingPile(self.concurrency) as pile:
|
||||
for obj_name, resp in pile.asyncstarmap(do_head, (
|
||||
@ -1218,7 +1241,7 @@ class StaticLargeObject(object):
|
||||
self.yield_frequency):
|
||||
# Make sure we've called start_response before
|
||||
# sending data
|
||||
yield ' '
|
||||
yield b' '
|
||||
last_yield_time = now
|
||||
for i in path2indices[obj_name]:
|
||||
segment_length, seg_data = validate_seg_dict(
|
||||
@ -1241,7 +1264,10 @@ class StaticLargeObject(object):
|
||||
resp_dict = {}
|
||||
if heartbeat:
|
||||
resp_dict['Response Status'] = err.status
|
||||
resp_dict['Response Body'] = err.body or '\n'.join(
|
||||
err_body = err.body
|
||||
if six.PY3:
|
||||
err_body = err_body.decode('utf-8', errors='replace')
|
||||
resp_dict['Response Body'] = err_body or '\n'.join(
|
||||
RESPONSE_REASONS.get(err.status_int, ['']))
|
||||
else:
|
||||
start_response(err.status,
|
||||
@ -1255,23 +1281,28 @@ class StaticLargeObject(object):
|
||||
for seg_data in data_for_storage:
|
||||
if 'data' in seg_data:
|
||||
raw_data = base64.b64decode(seg_data['data'])
|
||||
slo_etag.update(md5(raw_data).hexdigest())
|
||||
r = md5(raw_data).hexdigest()
|
||||
elif seg_data.get('range'):
|
||||
slo_etag.update('%s:%s;' % (seg_data['hash'],
|
||||
seg_data['range']))
|
||||
r = '%s:%s;' % (seg_data['hash'], seg_data['range'])
|
||||
else:
|
||||
slo_etag.update(seg_data['hash'])
|
||||
r = seg_data['hash']
|
||||
slo_etag.update(r.encode('ascii') if six.PY3 else r)
|
||||
|
||||
slo_etag = slo_etag.hexdigest()
|
||||
client_etag = req.headers.get('Etag')
|
||||
if client_etag and client_etag.strip('"') != slo_etag:
|
||||
err = HTTPUnprocessableEntity(request=req)
|
||||
if heartbeat:
|
||||
yield separator + get_response_body(out_content_type, {
|
||||
'Response Status': err.status,
|
||||
'Response Body': err.body or '\n'.join(
|
||||
RESPONSE_REASONS.get(err.status_int, [''])),
|
||||
}, problem_segments, 'upload')
|
||||
resp_dict = {}
|
||||
resp_dict['Response Status'] = err.status
|
||||
err_body = err.body
|
||||
if six.PY3 and isinstance(err_body, bytes):
|
||||
err_body = err_body.decode('utf-8', errors='replace')
|
||||
resp_dict['Response Body'] = err_body or '\n'.join(
|
||||
RESPONSE_REASONS.get(err.status_int, ['']))
|
||||
yield separator + get_response_body(
|
||||
out_content_type, resp_dict, problem_segments,
|
||||
'upload')
|
||||
else:
|
||||
for chunk in err(req.environ, start_response):
|
||||
yield chunk
|
||||
@ -1298,7 +1329,8 @@ class StaticLargeObject(object):
|
||||
|
||||
env = req.environ
|
||||
if not env.get('CONTENT_TYPE'):
|
||||
guessed_type, _junk = mimetypes.guess_type(req.path_info)
|
||||
guessed_type, _junk = mimetypes.guess_type(
|
||||
wsgi_to_str(req.path_info))
|
||||
env['CONTENT_TYPE'] = (guessed_type or
|
||||
'application/octet-stream')
|
||||
env['swift.content_type_overridden'] = True
|
||||
@ -1312,7 +1344,10 @@ class StaticLargeObject(object):
|
||||
resp_dict['Last Modified'] = resp.headers['Last-Modified']
|
||||
|
||||
if heartbeat:
|
||||
resp_dict['Response Body'] = resp.body
|
||||
resp_body = resp.body
|
||||
if six.PY3 and isinstance(resp_body, bytes):
|
||||
resp_body = resp_body.decode('utf-8')
|
||||
resp_dict['Response Body'] = resp_body
|
||||
yield separator + get_response_body(
|
||||
out_content_type, resp_dict, [], 'upload')
|
||||
else:
|
||||
@ -1332,14 +1367,18 @@ class StaticLargeObject(object):
|
||||
:raises HTTPBadRequest: on too many buffered sub segments and
|
||||
on invalid SLO manifest path
|
||||
"""
|
||||
if not check_utf8(req.path_info):
|
||||
if not check_utf8(wsgi_to_str(req.path_info)):
|
||||
raise HTTPPreconditionFailed(
|
||||
request=req, body='Invalid UTF8 or contains NULL')
|
||||
vrs, account, container, obj = req.split_path(4, 4, True)
|
||||
if six.PY2:
|
||||
obj_path = ('/%s/%s' % (container, obj)).decode('utf-8')
|
||||
else:
|
||||
obj_path = '/%s/%s' % (wsgi_to_str(container), wsgi_to_str(obj))
|
||||
|
||||
segments = [{
|
||||
'sub_slo': True,
|
||||
'name': ('/%s/%s' % (container, obj)).decode('utf-8')}]
|
||||
'name': obj_path}]
|
||||
while segments:
|
||||
if len(segments) > MAX_BUFFERED_SLO_SEGMENTS:
|
||||
raise HTTPBadRequest(
|
||||
@ -1353,14 +1392,18 @@ class StaticLargeObject(object):
|
||||
self.get_slo_segments(seg_data['name'], req))
|
||||
except HTTPException as err:
|
||||
# allow bulk delete response to report errors
|
||||
err_body = err.body
|
||||
if six.PY3 and isinstance(err_body, bytes):
|
||||
err_body = err_body.decode('utf-8', errors='replace')
|
||||
seg_data['error'] = {'code': err.status_int,
|
||||
'message': err.body}
|
||||
'message': err_body}
|
||||
|
||||
# add manifest back to be deleted after segments
|
||||
seg_data['sub_slo'] = False
|
||||
segments.append(seg_data)
|
||||
else:
|
||||
seg_data['name'] = seg_data['name'].encode('utf-8')
|
||||
if six.PY2:
|
||||
seg_data['name'] = seg_data['name'].encode('utf-8')
|
||||
yield seg_data
|
||||
|
||||
def get_slo_segments(self, obj_name, req):
|
||||
@ -1386,9 +1429,15 @@ class StaticLargeObject(object):
|
||||
new_env['HTTP_USER_AGENT'] = \
|
||||
'%s MultipartDELETE' % new_env.get('HTTP_USER_AGENT')
|
||||
new_env['swift.source'] = 'SLO'
|
||||
new_env['PATH_INFO'] = (
|
||||
'/%s/%s/%s' % (vrs, account, obj_name.lstrip('/').encode('utf-8'))
|
||||
)
|
||||
if six.PY2:
|
||||
new_env['PATH_INFO'] = (
|
||||
'/%s/%s/%s' % (vrs, account,
|
||||
obj_name.lstrip('/').encode('utf-8'))
|
||||
)
|
||||
else:
|
||||
new_env['PATH_INFO'] = (
|
||||
'/%s/%s/%s' % (vrs, account, str_to_wsgi(obj_name.lstrip('/')))
|
||||
)
|
||||
resp = Request.blank('', new_env).get_response(self.app)
|
||||
|
||||
if resp.is_success:
|
||||
|
@ -531,8 +531,7 @@ class VersionedWritesContext(WSGIContext):
|
||||
|
||||
put_path_info = "/%s/%s/%s/%s" % (
|
||||
api_version, account_name, container_name, object_name)
|
||||
put_resp = self._put_versioned_obj(
|
||||
req, put_path_info, get_resp)
|
||||
put_resp = self._put_versioned_obj(req, put_path_info, get_resp)
|
||||
|
||||
self._check_response_error(req, put_resp)
|
||||
close_if_possible(put_resp.app_iter)
|
||||
|
@ -921,9 +921,10 @@ class Request(object):
|
||||
"""
|
||||
headers = headers or {}
|
||||
environ = environ or {}
|
||||
if six.PY2 and isinstance(path, six.text_type):
|
||||
path = path.encode('utf-8')
|
||||
elif not six.PY2:
|
||||
if six.PY2:
|
||||
if isinstance(path, six.text_type):
|
||||
path = path.encode('utf-8')
|
||||
else:
|
||||
if isinstance(path, six.binary_type):
|
||||
path = path.decode('latin1')
|
||||
else:
|
||||
|
@ -4450,6 +4450,10 @@ def mime_to_document_iters(input_file, boundary, read_chunk_size=4096):
|
||||
(e.g. "divider", not "--divider")
|
||||
:param read_chunk_size: size of strings read via input_file.read()
|
||||
"""
|
||||
if six.PY3 and isinstance(boundary, str):
|
||||
# Since the boundary is in client-supplied headers, it can contain
|
||||
# garbage that trips us and we don't like client-induced 500.
|
||||
boundary = boundary.encode('latin-1', errors='replace')
|
||||
doc_files = iter_multipart_mime_documents(input_file, boundary,
|
||||
read_chunk_size)
|
||||
for i, doc_file in enumerate(doc_files):
|
||||
|
@ -780,7 +780,7 @@ class ContainerController(BaseStorageServer):
|
||||
start_time = time.time()
|
||||
req = Request(env)
|
||||
self.logger.txn_id = req.headers.get('x-trans-id', None)
|
||||
if not check_utf8(req.path_info):
|
||||
if not check_utf8(wsgi_to_str(req.path_info)):
|
||||
res = HTTPPreconditionFailed(body='Invalid UTF8 or contains NULL')
|
||||
else:
|
||||
try:
|
||||
|
@ -55,7 +55,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
|
||||
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
|
||||
HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, \
|
||||
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HTTPConflict, \
|
||||
HTTPServerError, wsgi_to_bytes
|
||||
HTTPServerError, wsgi_to_bytes, wsgi_to_str
|
||||
from swift.obj.diskfile import RESERVED_DATAFILE_META, DiskFileRouter
|
||||
|
||||
|
||||
@ -1271,7 +1271,7 @@ class ObjectController(BaseStorageServer):
|
||||
req = Request(env)
|
||||
self.logger.txn_id = req.headers.get('x-trans-id', None)
|
||||
|
||||
if not check_utf8(req.path_info):
|
||||
if not check_utf8(wsgi_to_str(req.path_info)):
|
||||
res = HTTPPreconditionFailed(body='Invalid UTF8 or contains NULL')
|
||||
else:
|
||||
try:
|
||||
|
File diff suppressed because it is too large
Load Diff
1
tox.ini
1
tox.ini
@ -65,6 +65,7 @@ commands =
|
||||
test/unit/common/middleware/test_ratelimit.py \
|
||||
test/unit/common/middleware/test_read_only.py \
|
||||
test/unit/common/middleware/test_recon.py \
|
||||
test/unit/common/middleware/test_slo.py \
|
||||
test/unit/common/middleware/test_subrequest_logging.py \
|
||||
test/unit/common/middleware/test_staticweb.py \
|
||||
test/unit/common/middleware/test_tempauth.py \
|
||||
|
Loading…
Reference in New Issue
Block a user