From b7e22788607fb1622a5b89ee33051bec843271f8 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Fri, 15 Jun 2018 11:37:59 -0700 Subject: [PATCH] Support long-running multipart uploads S3 docs say: > Processing of a Complete Multipart Upload request could > take several minutes to complete. After Amazon S3 begins > processing the request, it sends an HTTP response header > that specifies a 200 OK response. While processing is in > progress, Amazon S3 periodically sends whitespace > characters to keep the connection from timing out. Because > a request could fail after the initial 200 OK response has > been sent, it is important that you check the response > body to determine whether the request succeeded. Let's do that, too! Change-Id: Iaf420983c41256ee9a4c43cfd74025d2ca069ae6 Closes-Bug: 1718811 Related-Change: I65cee5f629c87364e188aa05a06d563c3849c8f3 --- .../s3api/controllers/multi_upload.py | 135 +++++++++++------ swift/common/middleware/s3api/etree.py | 5 +- swift/common/middleware/s3api/s3request.py | 1 + swift/common/middleware/s3api/s3response.py | 13 +- test/functional/s3api/test_multi_upload.py | 10 +- test/unit/common/middleware/s3api/helpers.py | 10 +- .../middleware/s3api/test_multi_upload.py | 142 +++++++++++++++++- 7 files changed, 252 insertions(+), 64 deletions(-) diff --git a/swift/common/middleware/s3api/controllers/multi_upload.py b/swift/common/middleware/s3api/controllers/multi_upload.py index 51586bad0e..5066526f22 100644 --- a/swift/common/middleware/s3api/controllers/multi_upload.py +++ b/swift/common/middleware/s3api/controllers/multi_upload.py @@ -64,7 +64,7 @@ import os import re from swift.common.swob import Range -from swift.common.utils import json, public +from swift.common.utils import json, public, reiterate from swift.common.db import utf8encode from six.moves.urllib.parse import quote, urlparse @@ -540,7 +540,7 @@ class UploadController(Controller): """ upload_id = req.params['uploadId'] resp = _get_upload_info(req, self.app, upload_id) - headers = {} + headers = {'Accept': 'application/json'} for key, val in resp.headers.items(): _key = key.lower() if _key.startswith('x-amz-meta-'): @@ -609,6 +609,7 @@ class UploadController(Controller): info['size_bytes'] = int(info['size_bytes']) manifest.append(info) except (XMLSyntaxError, DocumentInvalid): + # NB: our schema definitions catch uploads with no parts here raise MalformedXML() except ErrorResponse: raise @@ -628,58 +629,94 @@ class UploadController(Controller): if info['size_bytes'] < self.conf.min_segment_size: raise EntityTooSmall() - try: - # TODO: add support for versioning - if manifest: - resp = req.get_response(self.app, 'PUT', - body=json.dumps(manifest), - query={'multipart-manifest': 'put'}, - headers=headers) - else: - # the upload must have consisted of a single zero-length part - # just write it directly - resp = req.get_response(self.app, 'PUT', body='', - headers=headers) - except BadSwiftRequest as e: - msg = str(e) - expected_msg = 'too small; each segment must be at least 1 byte' - if expected_msg in msg: - # FIXME: AWS S3 allows a smaller object than 5 MB if there is - # only one part. Use a COPY request to copy the part object - # from the segments container instead. - raise EntityTooSmall(msg) - else: - raise + def response_iter(): + # NB: XML requires that the XML declaration, if present, be at the + # very start of the document. Clients *will* call us out on not + # being valid XML if we pass through whitespace before it. + # Track whether we've sent anything yet so we can yield out that + # declaration *first* + yielded_anything = False - # clean up the multipart-upload record - obj = '%s/%s' % (req.object_name, upload_id) - try: - req.get_response(self.app, 'DELETE', container, obj) - except NoSuchKey: - pass # We know that this existed long enough for us to HEAD + try: + try: + # TODO: add support for versioning + put_resp = req.get_response( + self.app, 'PUT', body=json.dumps(manifest), + query={'multipart-manifest': 'put', + 'heartbeat': 'on'}, + headers=headers) + if put_resp.status_int == 202: + body = [] + put_resp.fix_conditional_response() + for chunk in put_resp.response_iter: + if not chunk.strip(): + if not yielded_anything: + yield ('\n') + yielded_anything = True + yield chunk + body.append(chunk) + body = json.loads(''.join(body)) + if body['Response Status'] != '201 Created': + raise InvalidRequest( + status=body['Response Status'], + msg='\n'.join(': '.join(err) + for err in body['Errors'])) + except BadSwiftRequest as e: + msg = str(e) + expected_msg = ('too small; each segment must be ' + 'at least 1 byte') + if expected_msg in msg: + # FIXME: AWS S3 allows a smaller object than 5 MB if + # there is only one part. Use a COPY request to copy + # the part object from the segments container instead. + raise EntityTooSmall(msg) + else: + raise - result_elem = Element('CompleteMultipartUploadResult') + # clean up the multipart-upload record + obj = '%s/%s' % (req.object_name, upload_id) + try: + req.get_response(self.app, 'DELETE', container, obj) + except NoSuchKey: + # We know that this existed long enough for us to HEAD + pass - # NOTE: boto with sig v4 appends port to HTTP_HOST value at the - # request header when the port is non default value and it makes - # req.host_url like as http://localhost:8080:8080/path - # that obviously invalid. Probably it should be resolved at - # swift.common.swob though, tentatively we are parsing and - # reconstructing the correct host_url info here. - # in detail, https://github.com/boto/boto/pull/3513 - parsed_url = urlparse(req.host_url) - host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname) - if parsed_url.port: - host_url += ':%s' % parsed_url.port + result_elem = Element('CompleteMultipartUploadResult') - SubElement(result_elem, 'Location').text = host_url + req.path - SubElement(result_elem, 'Bucket').text = req.container_name - SubElement(result_elem, 'Key').text = req.object_name - SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag - del resp.headers['ETag'] + # NOTE: boto with sig v4 appends port to HTTP_HOST value at + # the request header when the port is non default value and it + # makes req.host_url like as http://localhost:8080:8080/path + # that obviously invalid. Probably it should be resolved at + # swift.common.swob though, tentatively we are parsing and + # reconstructing the correct host_url info here. + # in detail, https://github.com/boto/boto/pull/3513 + parsed_url = urlparse(req.host_url) + host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname) + if parsed_url.port: + host_url += ':%s' % parsed_url.port - resp.body = tostring(result_elem) - resp.status = 200 + SubElement(result_elem, 'Location').text = host_url + req.path + SubElement(result_elem, 'Bucket').text = req.container_name + SubElement(result_elem, 'Key').text = req.object_name + SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag + resp.headers.pop('ETag', None) + if yielded_anything: + yield '\n' + yield tostring(result_elem, + xml_declaration=not yielded_anything) + except ErrorResponse as err_resp: + if yielded_anything: + err_resp.xml_declaration = False + yield '\n' + else: + # Oh good, we can still change HTTP status code, too! + resp.status = err_resp.status + for chunk in err_resp({}, lambda *a: None): + yield chunk + + resp = HTTPOk() # assume we're good for now... but see above! + resp.app_iter = reiterate(response_iter()) resp.content_type = "application/xml" return resp diff --git a/swift/common/middleware/s3api/etree.py b/swift/common/middleware/s3api/etree.py index ac4997c692..dcdd7f616d 100644 --- a/swift/common/middleware/s3api/etree.py +++ b/swift/common/middleware/s3api/etree.py @@ -85,7 +85,7 @@ def fromstring(text, root_tag=None, logger=None): return elem -def tostring(tree, use_s3ns=True): +def tostring(tree, use_s3ns=True, xml_declaration=True): if use_s3ns: nsmap = tree.nsmap.copy() nsmap[None] = XMLNS_S3 @@ -95,7 +95,8 @@ def tostring(tree, use_s3ns=True): root.extend(deepcopy(tree.getchildren())) tree = root - return lxml.etree.tostring(tree, xml_declaration=True, encoding='UTF-8') + return lxml.etree.tostring(tree, xml_declaration=xml_declaration, + encoding='UTF-8') class _Element(lxml.etree.ElementBase): diff --git a/swift/common/middleware/s3api/s3request.py b/swift/common/middleware/s3api/s3request.py index 03894a09e0..368d249d8e 100644 --- a/swift/common/middleware/s3api/s3request.py +++ b/swift/common/middleware/s3api/s3request.py @@ -1146,6 +1146,7 @@ class S3Request(swob.Request): ], 'PUT': [ HTTP_CREATED, + HTTP_ACCEPTED, # For SLO with heartbeating ], 'POST': [ HTTP_ACCEPTED, diff --git a/swift/common/middleware/s3api/s3response.py b/swift/common/middleware/s3api/s3response.py index fea4a8057b..5b23a8a92d 100644 --- a/swift/common/middleware/s3api/s3response.py +++ b/swift/common/middleware/s3api/s3response.py @@ -194,6 +194,7 @@ class ErrorResponse(S3ResponseBase, swob.HTTPException): _status = '' _msg = '' _code = '' + xml_declaration = True def __init__(self, msg=None, *args, **kwargs): if msg: @@ -206,10 +207,11 @@ class ErrorResponse(S3ResponseBase, swob.HTTPException): if self.info.get(reserved_key): del(self.info[reserved_key]) - swob.HTTPException.__init__(self, status=self._status, - app_iter=self._body_iter(), - content_type='application/xml', *args, - **kwargs) + swob.HTTPException.__init__( + self, status=kwargs.pop('status', self._status), + app_iter=self._body_iter(), + content_type='application/xml', *args, + **kwargs) self.headers = HeaderKeyDict(self.headers) def _body_iter(self): @@ -222,7 +224,8 @@ class ErrorResponse(S3ResponseBase, swob.HTTPException): self._dict_to_etree(error_elem, self.info) - yield tostring(error_elem, use_s3ns=False) + yield tostring(error_elem, use_s3ns=False, + xml_declaration=self.xml_declaration) def _dict_to_etree(self, parent, d): for key, value in d.items(): diff --git a/test/functional/s3api/test_multi_upload.py b/test/functional/s3api/test_multi_upload.py index 8149164f09..f638181b2a 100644 --- a/test/functional/s3api/test_multi_upload.py +++ b/test/functional/s3api/test_multi_upload.py @@ -303,8 +303,14 @@ class TestS3ApiMultiUpload(S3ApiBase): self.assertCommonResponseHeaders(headers) self.assertTrue('content-type' in headers) self.assertEqual(headers['content-type'], 'application/xml') - self.assertTrue('content-length' in headers) - self.assertEqual(headers['content-length'], str(len(body))) + if 'content-length' in headers: + self.assertEqual(headers['content-length'], str(len(body))) + else: + self.assertIn('transfer-encoding', headers) + self.assertEqual(headers['transfer-encoding'], 'chunked') + lines = body.split('\n') + self.assertTrue(lines[0].startswith(''), body) elem = fromstring(body, 'CompleteMultipartUploadResult') # TODO: use tf.config value self.assertEqual( diff --git a/test/unit/common/middleware/s3api/helpers.py b/test/unit/common/middleware/s3api/helpers.py index b38c1e5f02..f10b98e7fc 100644 --- a/test/unit/common/middleware/s3api/helpers.py +++ b/test/unit/common/middleware/s3api/helpers.py @@ -128,8 +128,14 @@ class FakeSwift(object): method == 'PUT' and 'X-Copy-From' in req.headers and 'Range' in req.headers) - resp = resp_class(req=req, headers=headers, body=body, - conditional_response=support_range_and_conditional) + if isinstance(body, list): + app_iter = body + body = None + else: + app_iter = None + resp = resp_class( + req=req, headers=headers, body=body, app_iter=app_iter, + conditional_response=support_range_and_conditional) return resp(env, start_response) @property diff --git a/test/unit/common/middleware/s3api/test_multi_upload.py b/test/unit/common/middleware/s3api/test_multi_upload.py index d6c1b61c0d..819de41801 100644 --- a/test/unit/common/middleware/s3api/test_multi_upload.py +++ b/test/unit/common/middleware/s3api/test_multi_upload.py @@ -155,6 +155,8 @@ class TestS3ApiMultiUpload(S3ApiTestCase): 'Date': self.get_date_header()}) status, headers, body = self.call_s3api(req) self.assertEqual(self._get_error_code(body), 'InvalidRequest') + self.assertEqual(self._get_error_message(body), + 'A key must be specified') @s3acl def test_bucket_multipart_uploads_complete(self): @@ -684,7 +686,8 @@ class TestS3ApiMultiUpload(S3ApiTestCase): ('GET', '/v1/AUTH_test/bucket+segments?delimiter=/' '&format=json&prefix=object/X/'), # Create the SLO - ('PUT', '/v1/AUTH_test/bucket/object?multipart-manifest=put'), + ('PUT', '/v1/AUTH_test/bucket/object' + '?heartbeat=on&multipart-manifest=put'), # Delete the in-progress-upload marker ('DELETE', '/v1/AUTH_test/bucket+segments/object/X') ]) @@ -697,6 +700,94 @@ class TestS3ApiMultiUpload(S3ApiTestCase): h = 'X-Object-Sysmeta-Container-Update-Override-Etag' self.assertEqual(headers.get(h), override_etag) + def test_object_multipart_upload_complete_with_heartbeat(self): + self.swift.register( + 'HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X', + swob.HTTPOk, {}, None) + self.swift.register( + 'GET', '/v1/AUTH_test/bucket+segments', swob.HTTPOk, {}, + json.dumps([ + {'name': item[0].replace('object', 'heartbeat-ok'), + 'last_modified': item[1], 'hash': item[2], 'bytes': item[3]} + for item in objects_template + ])) + self.swift.register( + 'PUT', '/v1/AUTH_test/bucket/heartbeat-ok', + swob.HTTPAccepted, {}, [' ', ' ', ' ', json.dumps({ + 'Etag': '"slo-etag"', + 'Response Status': '201 Created', + 'Errors': [], + })]) + self.swift.register( + 'DELETE', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X', + swob.HTTPNoContent, {}, None) + + req = Request.blank('/bucket/heartbeat-ok?uploadId=X', + environ={'REQUEST_METHOD': 'POST'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': self.get_date_header(), }, + body=xml) + status, headers, body = self.call_s3api(req) + lines = body.split('\n') + self.assertTrue(lines[0].startswith('%s' % s3_etag, body) + self.assertEqual(self.swift.calls, [ + ('HEAD', '/v1/AUTH_test/bucket'), + ('HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X'), + ('GET', '/v1/AUTH_test/bucket+segments?' + 'delimiter=/&format=json&prefix=heartbeat-ok/X/'), + ('PUT', '/v1/AUTH_test/bucket/heartbeat-ok?' + 'heartbeat=on&multipart-manifest=put'), + ('DELETE', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X'), + ]) + + def test_object_multipart_upload_complete_failure_with_heartbeat(self): + self.swift.register( + 'HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-fail/X', + swob.HTTPOk, {}, None) + self.swift.register( + 'GET', '/v1/AUTH_test/bucket+segments', swob.HTTPOk, {}, + json.dumps([ + {'name': item[0].replace('object', 'heartbeat-fail'), + 'last_modified': item[1], 'hash': item[2], 'bytes': item[3]} + for item in objects_template + ])) + self.swift.register( + 'PUT', '/v1/AUTH_test/bucket/heartbeat-fail', + swob.HTTPAccepted, {}, [' ', ' ', ' ', json.dumps({ + 'Response Status': '400 Bad Request', + 'Errors': [['some/object', '404 Not Found']], + })]) + + req = Request.blank('/bucket/heartbeat-fail?uploadId=X', + environ={'REQUEST_METHOD': 'POST'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': self.get_date_header(), }, + body=xml) + status, headers, body = self.call_s3api(req) + lines = body.split('\n') + self.assertTrue(lines[0].startswith('' + + req = Request.blank('/empty-bucket/object?uploadId=X', + environ={'REQUEST_METHOD': 'POST'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': self.get_date_header(), }, + body=xml) + status, headers, body = self.call_s3api(req) + self.assertEqual(status.split()[0], '400') + fromstring(body, 'Error') + + self.assertEqual(self.swift.calls, [ + ('HEAD', '/v1/AUTH_test/empty-bucket'), + ('HEAD', '/v1/AUTH_test/empty-bucket+segments/object/X'), + ('GET', '/v1/AUTH_test/empty-bucket+segments?delimiter=/&' + 'format=json&prefix=object/X/'), + ]) + def test_object_multipart_upload_complete_single_zero_length_segment(self): segment_bucket = '/v1/AUTH_test/empty-bucket+segments' put_headers = {'etag': self.etag, 'last-modified': self.last_modified} @@ -844,8 +977,8 @@ class TestS3ApiMultiUpload(S3ApiTestCase): ('HEAD', '/v1/AUTH_test/empty-bucket+segments/object/X'), ('GET', '/v1/AUTH_test/empty-bucket+segments?delimiter=/&' 'format=json&prefix=object/X/'), - ('PUT', - '/v1/AUTH_test/empty-bucket/object?multipart-manifest=put'), + ('PUT', '/v1/AUTH_test/empty-bucket/object?' + 'heartbeat=on&multipart-manifest=put'), ('DELETE', '/v1/AUTH_test/empty-bucket+segments/object/X'), ]) _, _, put_headers = self.swift.calls_with_headers[-2] @@ -965,7 +1098,8 @@ class TestS3ApiMultiUpload(S3ApiTestCase): ('HEAD', '/v1/AUTH_test/bucket+segments/object/X'), ('GET', '/v1/AUTH_test/bucket+segments?delimiter=/&' 'format=json&prefix=object/X/'), - ('PUT', '/v1/AUTH_test/bucket/object?multipart-manifest=put'), + ('PUT', '/v1/AUTH_test/bucket/object?' + 'heartbeat=on&multipart-manifest=put'), ('DELETE', '/v1/AUTH_test/bucket+segments/object/X'), ])