Merge "Support long-running multipart uploads"

This commit is contained in:
Zuul 2018-12-08 12:53:26 +00:00 committed by Gerrit Code Review
commit 6edd70bb13
7 changed files with 252 additions and 64 deletions

View File

@ -64,7 +64,7 @@ import os
import re import re
from swift.common.swob import Range from swift.common.swob import Range
from swift.common.utils import json, public from swift.common.utils import json, public, reiterate
from swift.common.db import utf8encode from swift.common.db import utf8encode
from six.moves.urllib.parse import quote, urlparse from six.moves.urllib.parse import quote, urlparse
@ -540,7 +540,7 @@ class UploadController(Controller):
""" """
upload_id = req.params['uploadId'] upload_id = req.params['uploadId']
resp = _get_upload_info(req, self.app, upload_id) resp = _get_upload_info(req, self.app, upload_id)
headers = {} headers = {'Accept': 'application/json'}
for key, val in resp.headers.items(): for key, val in resp.headers.items():
_key = key.lower() _key = key.lower()
if _key.startswith('x-amz-meta-'): if _key.startswith('x-amz-meta-'):
@ -609,6 +609,7 @@ class UploadController(Controller):
info['size_bytes'] = int(info['size_bytes']) info['size_bytes'] = int(info['size_bytes'])
manifest.append(info) manifest.append(info)
except (XMLSyntaxError, DocumentInvalid): except (XMLSyntaxError, DocumentInvalid):
# NB: our schema definitions catch uploads with no parts here
raise MalformedXML() raise MalformedXML()
except ErrorResponse: except ErrorResponse:
raise raise
@ -628,58 +629,94 @@ class UploadController(Controller):
if info['size_bytes'] < self.conf.min_segment_size: if info['size_bytes'] < self.conf.min_segment_size:
raise EntityTooSmall() raise EntityTooSmall()
try: def response_iter():
# TODO: add support for versioning # NB: XML requires that the XML declaration, if present, be at the
if manifest: # very start of the document. Clients *will* call us out on not
resp = req.get_response(self.app, 'PUT', # being valid XML if we pass through whitespace before it.
body=json.dumps(manifest), # Track whether we've sent anything yet so we can yield out that
query={'multipart-manifest': 'put'}, # declaration *first*
headers=headers) yielded_anything = False
else:
# the upload must have consisted of a single zero-length part
# just write it directly
resp = req.get_response(self.app, 'PUT', body='',
headers=headers)
except BadSwiftRequest as e:
msg = str(e)
expected_msg = 'too small; each segment must be at least 1 byte'
if expected_msg in msg:
# FIXME: AWS S3 allows a smaller object than 5 MB if there is
# only one part. Use a COPY request to copy the part object
# from the segments container instead.
raise EntityTooSmall(msg)
else:
raise
# clean up the multipart-upload record try:
obj = '%s/%s' % (req.object_name, upload_id) try:
try: # TODO: add support for versioning
req.get_response(self.app, 'DELETE', container, obj) put_resp = req.get_response(
except NoSuchKey: self.app, 'PUT', body=json.dumps(manifest),
pass # We know that this existed long enough for us to HEAD query={'multipart-manifest': 'put',
'heartbeat': 'on'},
headers=headers)
if put_resp.status_int == 202:
body = []
put_resp.fix_conditional_response()
for chunk in put_resp.response_iter:
if not chunk.strip():
if not yielded_anything:
yield ('<?xml version="1.0" '
'encoding="UTF-8"?>\n')
yielded_anything = True
yield chunk
body.append(chunk)
body = json.loads(''.join(body))
if body['Response Status'] != '201 Created':
raise InvalidRequest(
status=body['Response Status'],
msg='\n'.join(': '.join(err)
for err in body['Errors']))
except BadSwiftRequest as e:
msg = str(e)
expected_msg = ('too small; each segment must be '
'at least 1 byte')
if expected_msg in msg:
# FIXME: AWS S3 allows a smaller object than 5 MB if
# there is only one part. Use a COPY request to copy
# the part object from the segments container instead.
raise EntityTooSmall(msg)
else:
raise
result_elem = Element('CompleteMultipartUploadResult') # clean up the multipart-upload record
obj = '%s/%s' % (req.object_name, upload_id)
try:
req.get_response(self.app, 'DELETE', container, obj)
except NoSuchKey:
# We know that this existed long enough for us to HEAD
pass
# NOTE: boto with sig v4 appends port to HTTP_HOST value at the result_elem = Element('CompleteMultipartUploadResult')
# request header when the port is non default value and it makes
# req.host_url like as http://localhost:8080:8080/path
# that obviously invalid. Probably it should be resolved at
# swift.common.swob though, tentatively we are parsing and
# reconstructing the correct host_url info here.
# in detail, https://github.com/boto/boto/pull/3513
parsed_url = urlparse(req.host_url)
host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname)
if parsed_url.port:
host_url += ':%s' % parsed_url.port
SubElement(result_elem, 'Location').text = host_url + req.path # NOTE: boto with sig v4 appends port to HTTP_HOST value at
SubElement(result_elem, 'Bucket').text = req.container_name # the request header when the port is non default value and it
SubElement(result_elem, 'Key').text = req.object_name # makes req.host_url like as http://localhost:8080:8080/path
SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag # that obviously invalid. Probably it should be resolved at
del resp.headers['ETag'] # swift.common.swob though, tentatively we are parsing and
# reconstructing the correct host_url info here.
# in detail, https://github.com/boto/boto/pull/3513
parsed_url = urlparse(req.host_url)
host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname)
if parsed_url.port:
host_url += ':%s' % parsed_url.port
resp.body = tostring(result_elem) SubElement(result_elem, 'Location').text = host_url + req.path
resp.status = 200 SubElement(result_elem, 'Bucket').text = req.container_name
SubElement(result_elem, 'Key').text = req.object_name
SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag
resp.headers.pop('ETag', None)
if yielded_anything:
yield '\n'
yield tostring(result_elem,
xml_declaration=not yielded_anything)
except ErrorResponse as err_resp:
if yielded_anything:
err_resp.xml_declaration = False
yield '\n'
else:
# Oh good, we can still change HTTP status code, too!
resp.status = err_resp.status
for chunk in err_resp({}, lambda *a: None):
yield chunk
resp = HTTPOk() # assume we're good for now... but see above!
resp.app_iter = reiterate(response_iter())
resp.content_type = "application/xml" resp.content_type = "application/xml"
return resp return resp

View File

@ -85,7 +85,7 @@ def fromstring(text, root_tag=None, logger=None):
return elem return elem
def tostring(tree, use_s3ns=True): def tostring(tree, use_s3ns=True, xml_declaration=True):
if use_s3ns: if use_s3ns:
nsmap = tree.nsmap.copy() nsmap = tree.nsmap.copy()
nsmap[None] = XMLNS_S3 nsmap[None] = XMLNS_S3
@ -95,7 +95,8 @@ def tostring(tree, use_s3ns=True):
root.extend(deepcopy(tree.getchildren())) root.extend(deepcopy(tree.getchildren()))
tree = root tree = root
return lxml.etree.tostring(tree, xml_declaration=True, encoding='UTF-8') return lxml.etree.tostring(tree, xml_declaration=xml_declaration,
encoding='UTF-8')
class _Element(lxml.etree.ElementBase): class _Element(lxml.etree.ElementBase):

View File

@ -1155,6 +1155,7 @@ class S3Request(swob.Request):
], ],
'PUT': [ 'PUT': [
HTTP_CREATED, HTTP_CREATED,
HTTP_ACCEPTED, # For SLO with heartbeating
], ],
'POST': [ 'POST': [
HTTP_ACCEPTED, HTTP_ACCEPTED,

View File

@ -200,6 +200,7 @@ class ErrorResponse(S3ResponseBase, swob.HTTPException):
_status = '' _status = ''
_msg = '' _msg = ''
_code = '' _code = ''
xml_declaration = True
def __init__(self, msg=None, *args, **kwargs): def __init__(self, msg=None, *args, **kwargs):
if msg: if msg:
@ -212,10 +213,11 @@ class ErrorResponse(S3ResponseBase, swob.HTTPException):
if self.info.get(reserved_key): if self.info.get(reserved_key):
del(self.info[reserved_key]) del(self.info[reserved_key])
swob.HTTPException.__init__(self, status=self._status, swob.HTTPException.__init__(
app_iter=self._body_iter(), self, status=kwargs.pop('status', self._status),
content_type='application/xml', *args, app_iter=self._body_iter(),
**kwargs) content_type='application/xml', *args,
**kwargs)
self.headers = HeaderKeyDict(self.headers) self.headers = HeaderKeyDict(self.headers)
def _body_iter(self): def _body_iter(self):
@ -228,7 +230,8 @@ class ErrorResponse(S3ResponseBase, swob.HTTPException):
self._dict_to_etree(error_elem, self.info) self._dict_to_etree(error_elem, self.info)
yield tostring(error_elem, use_s3ns=False) yield tostring(error_elem, use_s3ns=False,
xml_declaration=self.xml_declaration)
def _dict_to_etree(self, parent, d): def _dict_to_etree(self, parent, d):
for key, value in d.items(): for key, value in d.items():

View File

@ -303,8 +303,14 @@ class TestS3ApiMultiUpload(S3ApiBase):
self.assertCommonResponseHeaders(headers) self.assertCommonResponseHeaders(headers)
self.assertTrue('content-type' in headers) self.assertTrue('content-type' in headers)
self.assertEqual(headers['content-type'], 'application/xml') self.assertEqual(headers['content-type'], 'application/xml')
self.assertTrue('content-length' in headers) if 'content-length' in headers:
self.assertEqual(headers['content-length'], str(len(body))) self.assertEqual(headers['content-length'], str(len(body)))
else:
self.assertIn('transfer-encoding', headers)
self.assertEqual(headers['transfer-encoding'], 'chunked')
lines = body.split('\n')
self.assertTrue(lines[0].startswith('<?xml'), body)
self.assertTrue(lines[0].endswith('?>'), body)
elem = fromstring(body, 'CompleteMultipartUploadResult') elem = fromstring(body, 'CompleteMultipartUploadResult')
# TODO: use tf.config value # TODO: use tf.config value
self.assertEqual( self.assertEqual(

View File

@ -128,8 +128,14 @@ class FakeSwift(object):
method == 'PUT' and method == 'PUT' and
'X-Copy-From' in req.headers and 'X-Copy-From' in req.headers and
'Range' in req.headers) 'Range' in req.headers)
resp = resp_class(req=req, headers=headers, body=body, if isinstance(body, list):
conditional_response=support_range_and_conditional) app_iter = body
body = None
else:
app_iter = None
resp = resp_class(
req=req, headers=headers, body=body, app_iter=app_iter,
conditional_response=support_range_and_conditional)
return resp(env, start_response) return resp(env, start_response)
@property @property

View File

@ -155,6 +155,8 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
'Date': self.get_date_header()}) 'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req) status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'InvalidRequest') self.assertEqual(self._get_error_code(body), 'InvalidRequest')
self.assertEqual(self._get_error_message(body),
'A key must be specified')
@s3acl @s3acl
def test_bucket_multipart_uploads_complete(self): def test_bucket_multipart_uploads_complete(self):
@ -684,7 +686,8 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
('GET', '/v1/AUTH_test/bucket+segments?delimiter=/' ('GET', '/v1/AUTH_test/bucket+segments?delimiter=/'
'&format=json&prefix=object/X/'), '&format=json&prefix=object/X/'),
# Create the SLO # Create the SLO
('PUT', '/v1/AUTH_test/bucket/object?multipart-manifest=put'), ('PUT', '/v1/AUTH_test/bucket/object'
'?heartbeat=on&multipart-manifest=put'),
# Delete the in-progress-upload marker # Delete the in-progress-upload marker
('DELETE', '/v1/AUTH_test/bucket+segments/object/X') ('DELETE', '/v1/AUTH_test/bucket+segments/object/X')
]) ])
@ -697,6 +700,94 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
h = 'X-Object-Sysmeta-Container-Update-Override-Etag' h = 'X-Object-Sysmeta-Container-Update-Override-Etag'
self.assertEqual(headers.get(h), override_etag) self.assertEqual(headers.get(h), override_etag)
def test_object_multipart_upload_complete_with_heartbeat(self):
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X',
swob.HTTPOk, {}, None)
self.swift.register(
'GET', '/v1/AUTH_test/bucket+segments', swob.HTTPOk, {},
json.dumps([
{'name': item[0].replace('object', 'heartbeat-ok'),
'last_modified': item[1], 'hash': item[2], 'bytes': item[3]}
for item in objects_template
]))
self.swift.register(
'PUT', '/v1/AUTH_test/bucket/heartbeat-ok',
swob.HTTPAccepted, {}, [' ', ' ', ' ', json.dumps({
'Etag': '"slo-etag"',
'Response Status': '201 Created',
'Errors': [],
})])
self.swift.register(
'DELETE', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X',
swob.HTTPNoContent, {}, None)
req = Request.blank('/bucket/heartbeat-ok?uploadId=X',
environ={'REQUEST_METHOD': 'POST'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(), },
body=xml)
status, headers, body = self.call_s3api(req)
lines = body.split('\n')
self.assertTrue(lines[0].startswith('<?xml '))
self.assertTrue(lines[1])
self.assertFalse(lines[1].strip())
fromstring(body, 'CompleteMultipartUploadResult')
self.assertEqual(status.split()[0], '200')
# NB: s3_etag includes quotes
self.assertIn('<ETag>%s</ETag>' % s3_etag, body)
self.assertEqual(self.swift.calls, [
('HEAD', '/v1/AUTH_test/bucket'),
('HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X'),
('GET', '/v1/AUTH_test/bucket+segments?'
'delimiter=/&format=json&prefix=heartbeat-ok/X/'),
('PUT', '/v1/AUTH_test/bucket/heartbeat-ok?'
'heartbeat=on&multipart-manifest=put'),
('DELETE', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X'),
])
def test_object_multipart_upload_complete_failure_with_heartbeat(self):
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-fail/X',
swob.HTTPOk, {}, None)
self.swift.register(
'GET', '/v1/AUTH_test/bucket+segments', swob.HTTPOk, {},
json.dumps([
{'name': item[0].replace('object', 'heartbeat-fail'),
'last_modified': item[1], 'hash': item[2], 'bytes': item[3]}
for item in objects_template
]))
self.swift.register(
'PUT', '/v1/AUTH_test/bucket/heartbeat-fail',
swob.HTTPAccepted, {}, [' ', ' ', ' ', json.dumps({
'Response Status': '400 Bad Request',
'Errors': [['some/object', '404 Not Found']],
})])
req = Request.blank('/bucket/heartbeat-fail?uploadId=X',
environ={'REQUEST_METHOD': 'POST'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(), },
body=xml)
status, headers, body = self.call_s3api(req)
lines = body.split('\n')
self.assertTrue(lines[0].startswith('<?xml '))
self.assertTrue(lines[1])
self.assertFalse(lines[1].strip())
fromstring(body, 'Error')
self.assertEqual(status.split()[0], '200')
self.assertEqual(self._get_error_code(body), 'InvalidRequest')
self.assertEqual(self._get_error_message(body),
'some/object: 404 Not Found')
self.assertEqual(self.swift.calls, [
('HEAD', '/v1/AUTH_test/bucket'),
('HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-fail/X'),
('GET', '/v1/AUTH_test/bucket+segments?'
'delimiter=/&format=json&prefix=heartbeat-fail/X/'),
('PUT', '/v1/AUTH_test/bucket/heartbeat-fail?'
'heartbeat=on&multipart-manifest=put'),
])
def test_object_multipart_upload_complete_404_on_marker_delete(self): def test_object_multipart_upload_complete_404_on_marker_delete(self):
segment_bucket = '/v1/AUTH_test/bucket+segments' segment_bucket = '/v1/AUTH_test/bucket+segments'
self.swift.register('DELETE', segment_bucket + '/object/X', self.swift.register('DELETE', segment_bucket + '/object/X',
@ -798,6 +889,48 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
'allowed object size.') 'allowed object size.')
self.assertNotIn('PUT', [method for method, _ in self.swift.calls]) self.assertNotIn('PUT', [method for method, _ in self.swift.calls])
def test_object_multipart_upload_complete_zero_segments(self):
segment_bucket = '/v1/AUTH_test/empty-bucket+segments'
object_list = [{
'name': 'object/X/1',
'last_modified': self.last_modified,
'hash': 'd41d8cd98f00b204e9800998ecf8427e',
'bytes': '0',
}]
self.swift.register('GET', segment_bucket, swob.HTTPOk, {},
json.dumps(object_list))
self.swift.register('HEAD', '/v1/AUTH_test/empty-bucket',
swob.HTTPNoContent, {}, None)
self.swift.register('HEAD', segment_bucket + '/object/X',
swob.HTTPOk, {'x-object-meta-foo': 'bar',
'content-type': 'baz/quux'}, None)
self.swift.register('PUT', '/v1/AUTH_test/empty-bucket/object',
swob.HTTPCreated, {}, None)
self.swift.register('DELETE', segment_bucket + '/object/X/1',
swob.HTTPOk, {}, None)
self.swift.register('DELETE', segment_bucket + '/object/X',
swob.HTTPOk, {}, None)
xml = '<CompleteMultipartUpload></CompleteMultipartUpload>'
req = Request.blank('/empty-bucket/object?uploadId=X',
environ={'REQUEST_METHOD': 'POST'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(), },
body=xml)
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '400')
fromstring(body, 'Error')
self.assertEqual(self.swift.calls, [
('HEAD', '/v1/AUTH_test/empty-bucket'),
('HEAD', '/v1/AUTH_test/empty-bucket+segments/object/X'),
('GET', '/v1/AUTH_test/empty-bucket+segments?delimiter=/&'
'format=json&prefix=object/X/'),
])
def test_object_multipart_upload_complete_single_zero_length_segment(self): def test_object_multipart_upload_complete_single_zero_length_segment(self):
segment_bucket = '/v1/AUTH_test/empty-bucket+segments' segment_bucket = '/v1/AUTH_test/empty-bucket+segments'
put_headers = {'etag': self.etag, 'last-modified': self.last_modified} put_headers = {'etag': self.etag, 'last-modified': self.last_modified}
@ -844,8 +977,8 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
('HEAD', '/v1/AUTH_test/empty-bucket+segments/object/X'), ('HEAD', '/v1/AUTH_test/empty-bucket+segments/object/X'),
('GET', '/v1/AUTH_test/empty-bucket+segments?delimiter=/&' ('GET', '/v1/AUTH_test/empty-bucket+segments?delimiter=/&'
'format=json&prefix=object/X/'), 'format=json&prefix=object/X/'),
('PUT', ('PUT', '/v1/AUTH_test/empty-bucket/object?'
'/v1/AUTH_test/empty-bucket/object?multipart-manifest=put'), 'heartbeat=on&multipart-manifest=put'),
('DELETE', '/v1/AUTH_test/empty-bucket+segments/object/X'), ('DELETE', '/v1/AUTH_test/empty-bucket+segments/object/X'),
]) ])
_, _, put_headers = self.swift.calls_with_headers[-2] _, _, put_headers = self.swift.calls_with_headers[-2]
@ -965,7 +1098,8 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'), ('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
('GET', '/v1/AUTH_test/bucket+segments?delimiter=/&' ('GET', '/v1/AUTH_test/bucket+segments?delimiter=/&'
'format=json&prefix=object/X/'), 'format=json&prefix=object/X/'),
('PUT', '/v1/AUTH_test/bucket/object?multipart-manifest=put'), ('PUT', '/v1/AUTH_test/bucket/object?'
'heartbeat=on&multipart-manifest=put'),
('DELETE', '/v1/AUTH_test/bucket+segments/object/X'), ('DELETE', '/v1/AUTH_test/bucket+segments/object/X'),
]) ])