s3api: Allow CompleteMultipartUpload requests to be retried
When completing a multipart-upload, include the upload-id in sysmeta. If we can't find the upload marker, check the final object name; if it has an upload-id in sysmeta and it matches the upload-id that we're trying to complete, allow the complete to continue. Also add an early return if the already-completed upload's ETag matches the computed ETag for the user's request. This should help clients that can't take advantage of how we dribble out whitespace to try to keep the conneciton alive: The client times out, retries, and if the upload actually completed, it gets a fast 200 response. Change-Id: I38958839be5b250c9d268ec7c50a56cdb56c2fa2
This commit is contained in:
parent
a47d5a1152
commit
02548717ac
@ -108,6 +108,13 @@ def _get_upload_info(req, app, upload_id):
|
||||
try:
|
||||
return req.get_response(app, 'HEAD', container=container, obj=obj)
|
||||
except NoSuchKey:
|
||||
try:
|
||||
resp = req.get_response(app, 'HEAD')
|
||||
if resp.sysmeta_headers.get(sysmeta_header(
|
||||
'object', 'upload-id')) == upload_id:
|
||||
return resp
|
||||
except NoSuchKey:
|
||||
pass
|
||||
raise NoSuchUpload(upload_id=upload_id)
|
||||
finally:
|
||||
# ...making sure to restore any copy-source before returning
|
||||
@ -115,9 +122,34 @@ def _get_upload_info(req, app, upload_id):
|
||||
req.headers['X-Amz-Copy-Source'] = copy_source
|
||||
|
||||
|
||||
def _check_upload_info(req, app, upload_id):
|
||||
def _make_complete_body(req, s3_etag, yielded_anything):
|
||||
result_elem = Element('CompleteMultipartUploadResult')
|
||||
|
||||
_get_upload_info(req, app, upload_id)
|
||||
# NOTE: boto with sig v4 appends port to HTTP_HOST value at
|
||||
# the request header when the port is non default value and it
|
||||
# makes req.host_url like as http://localhost:8080:8080/path
|
||||
# that obviously invalid. Probably it should be resolved at
|
||||
# swift.common.swob though, tentatively we are parsing and
|
||||
# reconstructing the correct host_url info here.
|
||||
# in detail, https://github.com/boto/boto/pull/3513
|
||||
parsed_url = urlparse(req.host_url)
|
||||
host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname)
|
||||
# Why are we doing our own port parsing? Because py3 decided
|
||||
# to start raising ValueErrors on access after parsing such
|
||||
# an invalid port
|
||||
netloc = parsed_url.netloc.split('@')[-1].split(']')[-1]
|
||||
if ':' in netloc:
|
||||
port = netloc.split(':', 2)[1]
|
||||
host_url += ':%s' % port
|
||||
|
||||
SubElement(result_elem, 'Location').text = host_url + req.path
|
||||
SubElement(result_elem, 'Bucket').text = req.container_name
|
||||
SubElement(result_elem, 'Key').text = req.object_name
|
||||
SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag
|
||||
body = tostring(result_elem, xml_declaration=not yielded_anything)
|
||||
if yielded_anything:
|
||||
return b'\n' + body
|
||||
return body
|
||||
|
||||
|
||||
class PartController(Controller):
|
||||
@ -152,7 +184,7 @@ class PartController(Controller):
|
||||
err_msg)
|
||||
|
||||
upload_id = req.params['uploadId']
|
||||
_check_upload_info(req, self.app, upload_id)
|
||||
_get_upload_info(req, self.app, upload_id)
|
||||
|
||||
req.container_name += MULTIUPLOAD_SUFFIX
|
||||
req.object_name = '%s/%s/%d' % (req.object_name, upload_id,
|
||||
@ -449,7 +481,7 @@ class UploadController(Controller):
|
||||
raise InvalidArgument('encoding-type', encoding_type, err_msg)
|
||||
|
||||
upload_id = req.params['uploadId']
|
||||
_check_upload_info(req, self.app, upload_id)
|
||||
_get_upload_info(req, self.app, upload_id)
|
||||
|
||||
maxparts = req.get_validated_param(
|
||||
'max-parts', DEFAULT_MAX_PARTS_LISTING,
|
||||
@ -536,7 +568,7 @@ class UploadController(Controller):
|
||||
Handles Abort Multipart Upload.
|
||||
"""
|
||||
upload_id = req.params['uploadId']
|
||||
_check_upload_info(req, self.app, upload_id)
|
||||
_get_upload_info(req, self.app, upload_id)
|
||||
|
||||
# First check to see if this multi-part upload was already
|
||||
# completed. Look in the primary container, if the object exists,
|
||||
@ -580,7 +612,8 @@ class UploadController(Controller):
|
||||
"""
|
||||
upload_id = req.params['uploadId']
|
||||
resp = _get_upload_info(req, self.app, upload_id)
|
||||
headers = {'Accept': 'application/json'}
|
||||
headers = {'Accept': 'application/json',
|
||||
sysmeta_header('object', 'upload-id'): upload_id}
|
||||
for key, val in resp.headers.items():
|
||||
_key = key.lower()
|
||||
if _key.startswith('x-amz-meta-'):
|
||||
@ -650,7 +683,15 @@ class UploadController(Controller):
|
||||
raise
|
||||
|
||||
s3_etag = '%s-%d' % (s3_etag_hasher.hexdigest(), len(manifest))
|
||||
headers[sysmeta_header('object', 'etag')] = s3_etag
|
||||
s3_etag_header = sysmeta_header('object', 'etag')
|
||||
if resp.sysmeta_headers.get(s3_etag_header) == s3_etag:
|
||||
# This header should only already be present if the upload marker
|
||||
# has been cleaned up and the current target uses the same
|
||||
# upload-id; assuming the segments to use haven't changed, the work
|
||||
# is already done
|
||||
return HTTPOk(body=_make_complete_body(req, s3_etag, False),
|
||||
content_type='application/xml')
|
||||
headers[s3_etag_header] = s3_etag
|
||||
# Leave base header value blank; SLO will populate
|
||||
c_etag = '; s3_etag=%s' % s3_etag
|
||||
headers[get_container_update_override_key('etag')] = c_etag
|
||||
@ -730,37 +771,13 @@ class UploadController(Controller):
|
||||
try:
|
||||
req.get_response(self.app, 'DELETE', container, obj)
|
||||
except NoSuchKey:
|
||||
# We know that this existed long enough for us to HEAD
|
||||
# The important thing is that we wrote out a tombstone to
|
||||
# make sure the marker got cleaned up. If it's already
|
||||
# gone (e.g., because of concurrent completes or a retried
|
||||
# complete), so much the better.
|
||||
pass
|
||||
|
||||
result_elem = Element('CompleteMultipartUploadResult')
|
||||
|
||||
# NOTE: boto with sig v4 appends port to HTTP_HOST value at
|
||||
# the request header when the port is non default value and it
|
||||
# makes req.host_url like as http://localhost:8080:8080/path
|
||||
# that obviously invalid. Probably it should be resolved at
|
||||
# swift.common.swob though, tentatively we are parsing and
|
||||
# reconstructing the correct host_url info here.
|
||||
# in detail, https://github.com/boto/boto/pull/3513
|
||||
parsed_url = urlparse(req.host_url)
|
||||
host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname)
|
||||
# Why are we doing our own port parsing? Because py3 decided
|
||||
# to start raising ValueErrors on access after parsing such
|
||||
# an invalid port
|
||||
netloc = parsed_url.netloc.split('@')[-1].split(']')[-1]
|
||||
if ':' in netloc:
|
||||
port = netloc.split(':', 2)[1]
|
||||
host_url += ':%s' % port
|
||||
|
||||
SubElement(result_elem, 'Location').text = host_url + req.path
|
||||
SubElement(result_elem, 'Bucket').text = req.container_name
|
||||
SubElement(result_elem, 'Key').text = req.object_name
|
||||
SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag
|
||||
resp.headers.pop('ETag', None)
|
||||
if yielded_anything:
|
||||
yield b'\n'
|
||||
yield tostring(result_elem,
|
||||
xml_declaration=not yielded_anything)
|
||||
yield _make_complete_body(req, s3_etag, yielded_anything)
|
||||
except ErrorResponse as err_resp:
|
||||
if yielded_anything:
|
||||
err_resp.xml_declaration = False
|
||||
|
@ -120,10 +120,6 @@ class S3Timestamp(utils.Timestamp):
|
||||
return self.isoformat.replace(
|
||||
'-', '').replace(':', '')[:-7] + 'Z'
|
||||
|
||||
@classmethod
|
||||
def now(cls):
|
||||
return cls(time.time())
|
||||
|
||||
|
||||
def mktime(timestamp_str, time_format='%Y-%m-%dT%H:%M:%S'):
|
||||
"""
|
||||
|
@ -15,6 +15,8 @@
|
||||
|
||||
import unittest
|
||||
import traceback
|
||||
from contextlib import contextmanager
|
||||
import logging
|
||||
import test.functional as tf
|
||||
from test.functional.s3api.s3_test_client import (
|
||||
Connection, get_boto3_conn, tear_down_s3)
|
||||
@ -33,6 +35,14 @@ class S3ApiBase(unittest.TestCase):
|
||||
super(S3ApiBase, self).__init__(method_name)
|
||||
self.method_name = method_name
|
||||
|
||||
@contextmanager
|
||||
def quiet_boto_logging(self):
|
||||
try:
|
||||
logging.getLogger('boto').setLevel(logging.INFO)
|
||||
yield
|
||||
finally:
|
||||
logging.getLogger('boto').setLevel(logging.DEBUG)
|
||||
|
||||
def setUp(self):
|
||||
if 's3api' not in tf.cluster_info:
|
||||
raise tf.SkipTest('s3api middleware is not enabled')
|
||||
|
@ -78,9 +78,9 @@ class TestS3ApiMultiUpload(S3ApiBase):
|
||||
def _upload_part(self, bucket, key, upload_id, content=None, part_num=1):
|
||||
query = 'partNumber=%s&uploadId=%s' % (part_num, upload_id)
|
||||
content = content if content else b'a' * self.min_segment_size
|
||||
status, headers, body = \
|
||||
self.conn.make_request('PUT', bucket, key, body=content,
|
||||
query=query)
|
||||
with self.quiet_boto_logging():
|
||||
status, headers, body = self.conn.make_request(
|
||||
'PUT', bucket, key, body=content, query=query)
|
||||
return status, headers, body
|
||||
|
||||
def _upload_part_copy(self, src_bucket, src_obj, dst_bucket, dst_key,
|
||||
@ -113,7 +113,7 @@ class TestS3ApiMultiUpload(S3ApiBase):
|
||||
bucket = 'bucket'
|
||||
keys = ['obj1', 'obj2', 'obj3']
|
||||
bad_content_md5 = base64.b64encode(b'a' * 16).strip().decode('ascii')
|
||||
headers = [None,
|
||||
headers = [{'Content-Type': 'foo/bar', 'x-amz-meta-baz': 'quux'},
|
||||
{'Content-MD5': bad_content_md5},
|
||||
{'Etag': 'nonsense'}]
|
||||
uploads = []
|
||||
@ -293,7 +293,7 @@ class TestS3ApiMultiUpload(S3ApiBase):
|
||||
self._complete_multi_upload(bucket, key, upload_id, xml)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertCommonResponseHeaders(headers)
|
||||
self.assertTrue('content-type' in headers)
|
||||
self.assertIn('content-type', headers)
|
||||
self.assertEqual(headers['content-type'], 'application/xml')
|
||||
if 'content-length' in headers:
|
||||
self.assertEqual(headers['content-length'], str(len(body)))
|
||||
@ -317,9 +317,46 @@ class TestS3ApiMultiUpload(S3ApiBase):
|
||||
self.assertEqual(etag, exp_etag)
|
||||
|
||||
exp_size = self.min_segment_size * len(etags)
|
||||
status, headers, body = \
|
||||
self.conn.make_request('HEAD', bucket, key)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(headers['content-length'], str(exp_size))
|
||||
self.assertEqual(headers['content-type'], 'foo/bar')
|
||||
self.assertEqual(headers['x-amz-meta-baz'], 'quux')
|
||||
|
||||
swift_etag = '"%s"' % md5(concatted_etags).hexdigest()
|
||||
# TODO: GET via swift api, check against swift_etag
|
||||
|
||||
# Should be safe to retry
|
||||
status, headers, body = \
|
||||
self._complete_multi_upload(bucket, key, upload_id, xml)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertCommonResponseHeaders(headers)
|
||||
self.assertIn('content-type', headers)
|
||||
self.assertEqual(headers['content-type'], 'application/xml')
|
||||
if 'content-length' in headers:
|
||||
self.assertEqual(headers['content-length'], str(len(body)))
|
||||
else:
|
||||
self.assertIn('transfer-encoding', headers)
|
||||
self.assertEqual(headers['transfer-encoding'], 'chunked')
|
||||
lines = body.split(b'\n')
|
||||
self.assertTrue(lines[0].startswith(b'<?xml'), body)
|
||||
self.assertTrue(lines[0].endswith(b'?>'), body)
|
||||
elem = fromstring(body, 'CompleteMultipartUploadResult')
|
||||
self.assertEqual(
|
||||
'%s/bucket/obj1' % tf.config['s3_storage_url'].rstrip('/'),
|
||||
elem.find('Location').text)
|
||||
self.assertEqual(elem.find('Bucket').text, bucket)
|
||||
self.assertEqual(elem.find('Key').text, key)
|
||||
self.assertEqual(elem.find('ETag').text, exp_etag)
|
||||
|
||||
status, headers, body = \
|
||||
self.conn.make_request('HEAD', bucket, key)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(headers['content-length'], str(exp_size))
|
||||
self.assertEqual(headers['content-type'], 'foo/bar')
|
||||
self.assertEqual(headers['x-amz-meta-baz'], 'quux')
|
||||
|
||||
# Upload Part Copy -- MU as source
|
||||
key, upload_id = uploads[1]
|
||||
status, headers, body, resp_etag = \
|
||||
|
@ -827,7 +827,7 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
|
||||
self.assertEqual(self.swift.calls, [
|
||||
# Bucket exists
|
||||
('HEAD', '/v1/AUTH_test/bucket'),
|
||||
# Segment container exists
|
||||
# Upload marker exists
|
||||
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
|
||||
# Create the SLO
|
||||
('PUT', '/v1/AUTH_test/bucket/object'
|
||||
@ -843,6 +843,123 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
|
||||
override_etag = '; s3_etag=%s' % S3_ETAG.strip('"')
|
||||
h = 'X-Object-Sysmeta-Container-Update-Override-Etag'
|
||||
self.assertEqual(headers.get(h), override_etag)
|
||||
self.assertEqual(headers.get('X-Object-Sysmeta-S3Api-Upload-Id'), 'X')
|
||||
|
||||
def test_object_multipart_upload_retry_complete(self):
|
||||
content_md5 = base64.b64encode(hashlib.md5(
|
||||
XML.encode('ascii')).digest())
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
recent_ts = S3Timestamp.now(delta=-1000000).internal # 10s ago
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
|
||||
swob.HTTPOk,
|
||||
{'x-object-meta-foo': 'bar',
|
||||
'content-type': 'baz/quux',
|
||||
'x-object-sysmeta-s3api-upload-id': 'X',
|
||||
'x-object-sysmeta-s3api-etag': S3_ETAG.strip('"'),
|
||||
'x-timestamp': recent_ts}, None)
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac',
|
||||
'Date': self.get_date_header(),
|
||||
'Content-MD5': content_md5, },
|
||||
body=XML)
|
||||
status, headers, body = self.call_s3api(req)
|
||||
elem = fromstring(body, 'CompleteMultipartUploadResult')
|
||||
self.assertNotIn('Etag', headers)
|
||||
self.assertEqual(elem.find('ETag').text, S3_ETAG)
|
||||
self.assertEqual(status.split()[0], '200')
|
||||
|
||||
self.assertEqual(self.swift.calls, [
|
||||
# Bucket exists
|
||||
('HEAD', '/v1/AUTH_test/bucket'),
|
||||
# Upload marker does not exist
|
||||
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
|
||||
# But the object does, and with the same upload ID
|
||||
('HEAD', '/v1/AUTH_test/bucket/object'),
|
||||
# So no PUT necessary
|
||||
])
|
||||
|
||||
def test_object_multipart_upload_retry_complete_etag_mismatch(self):
|
||||
content_md5 = base64.b64encode(hashlib.md5(
|
||||
XML.encode('ascii')).digest())
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
recent_ts = S3Timestamp.now(delta=-1000000).internal
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
|
||||
swob.HTTPOk,
|
||||
{'x-object-meta-foo': 'bar',
|
||||
'content-type': 'baz/quux',
|
||||
'x-object-sysmeta-s3api-upload-id': 'X',
|
||||
'x-object-sysmeta-s3api-etag': 'not-the-etag',
|
||||
'x-timestamp': recent_ts}, None)
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac',
|
||||
'Date': self.get_date_header(),
|
||||
'Content-MD5': content_md5, },
|
||||
body=XML)
|
||||
status, headers, body = self.call_s3api(req)
|
||||
elem = fromstring(body, 'CompleteMultipartUploadResult')
|
||||
self.assertNotIn('Etag', headers)
|
||||
self.assertEqual(elem.find('ETag').text, S3_ETAG)
|
||||
self.assertEqual(status.split()[0], '200')
|
||||
|
||||
self.assertEqual(self.swift.calls, [
|
||||
# Bucket exists
|
||||
('HEAD', '/v1/AUTH_test/bucket'),
|
||||
# Upload marker does not exist
|
||||
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
|
||||
# But the object does, and with the same upload ID
|
||||
('HEAD', '/v1/AUTH_test/bucket/object'),
|
||||
# Create the SLO
|
||||
('PUT', '/v1/AUTH_test/bucket/object'
|
||||
'?heartbeat=on&multipart-manifest=put'),
|
||||
# Retry deleting the marker for the sake of completeness
|
||||
('DELETE', '/v1/AUTH_test/bucket+segments/object/X')
|
||||
])
|
||||
|
||||
_, _, headers = self.swift.calls_with_headers[-2]
|
||||
self.assertEqual(headers.get('X-Object-Meta-Foo'), 'bar')
|
||||
self.assertEqual(headers.get('Content-Type'), 'baz/quux')
|
||||
# SLO will provide a base value
|
||||
override_etag = '; s3_etag=%s' % S3_ETAG.strip('"')
|
||||
h = 'X-Object-Sysmeta-Container-Update-Override-Etag'
|
||||
self.assertEqual(headers.get(h), override_etag)
|
||||
self.assertEqual(headers.get('X-Object-Sysmeta-S3Api-Upload-Id'), 'X')
|
||||
|
||||
def test_object_multipart_upload_retry_complete_upload_id_mismatch(self):
|
||||
content_md5 = base64.b64encode(hashlib.md5(
|
||||
XML.encode('ascii')).digest())
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
recent_ts = S3Timestamp.now(delta=-1000000).internal
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
|
||||
swob.HTTPOk,
|
||||
{'x-object-meta-foo': 'bar',
|
||||
'content-type': 'baz/quux',
|
||||
'x-object-sysmeta-s3api-upload-id': 'Y',
|
||||
'x-object-sysmeta-s3api-etag': S3_ETAG.strip('"'),
|
||||
'x-timestamp': recent_ts}, None)
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac',
|
||||
'Date': self.get_date_header(),
|
||||
'Content-MD5': content_md5, },
|
||||
body=XML)
|
||||
status, headers, body = self.call_s3api(req)
|
||||
elem = fromstring(body, 'Error')
|
||||
self.assertEqual(elem.find('Code').text, 'NoSuchUpload')
|
||||
self.assertEqual(status.split()[0], '404')
|
||||
|
||||
self.assertEqual(self.swift.calls, [
|
||||
# Bucket exists
|
||||
('HEAD', '/v1/AUTH_test/bucket'),
|
||||
# Upload marker does not exist
|
||||
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
|
||||
# But the object does, and with the same upload ID
|
||||
('HEAD', '/v1/AUTH_test/bucket/object'),
|
||||
])
|
||||
|
||||
def test_object_multipart_upload_invalid_md5(self):
|
||||
bad_md5 = base64.b64encode(hashlib.md5(
|
||||
|
Loading…
Reference in New Issue
Block a user