Merge "s3api: Allow CompleteMultipartUpload requests to be retried"
This commit is contained in:
commit
ae85ee2474
@ -108,6 +108,13 @@ def _get_upload_info(req, app, upload_id):
|
||||
try:
|
||||
return req.get_response(app, 'HEAD', container=container, obj=obj)
|
||||
except NoSuchKey:
|
||||
try:
|
||||
resp = req.get_response(app, 'HEAD')
|
||||
if resp.sysmeta_headers.get(sysmeta_header(
|
||||
'object', 'upload-id')) == upload_id:
|
||||
return resp
|
||||
except NoSuchKey:
|
||||
pass
|
||||
raise NoSuchUpload(upload_id=upload_id)
|
||||
finally:
|
||||
# ...making sure to restore any copy-source before returning
|
||||
@ -115,9 +122,34 @@ def _get_upload_info(req, app, upload_id):
|
||||
req.headers['X-Amz-Copy-Source'] = copy_source
|
||||
|
||||
|
||||
def _check_upload_info(req, app, upload_id):
|
||||
def _make_complete_body(req, s3_etag, yielded_anything):
|
||||
result_elem = Element('CompleteMultipartUploadResult')
|
||||
|
||||
_get_upload_info(req, app, upload_id)
|
||||
# NOTE: boto with sig v4 appends port to HTTP_HOST value at
|
||||
# the request header when the port is non default value and it
|
||||
# makes req.host_url like as http://localhost:8080:8080/path
|
||||
# that obviously invalid. Probably it should be resolved at
|
||||
# swift.common.swob though, tentatively we are parsing and
|
||||
# reconstructing the correct host_url info here.
|
||||
# in detail, https://github.com/boto/boto/pull/3513
|
||||
parsed_url = urlparse(req.host_url)
|
||||
host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname)
|
||||
# Why are we doing our own port parsing? Because py3 decided
|
||||
# to start raising ValueErrors on access after parsing such
|
||||
# an invalid port
|
||||
netloc = parsed_url.netloc.split('@')[-1].split(']')[-1]
|
||||
if ':' in netloc:
|
||||
port = netloc.split(':', 2)[1]
|
||||
host_url += ':%s' % port
|
||||
|
||||
SubElement(result_elem, 'Location').text = host_url + req.path
|
||||
SubElement(result_elem, 'Bucket').text = req.container_name
|
||||
SubElement(result_elem, 'Key').text = req.object_name
|
||||
SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag
|
||||
body = tostring(result_elem, xml_declaration=not yielded_anything)
|
||||
if yielded_anything:
|
||||
return b'\n' + body
|
||||
return body
|
||||
|
||||
|
||||
class PartController(Controller):
|
||||
@ -152,7 +184,7 @@ class PartController(Controller):
|
||||
err_msg)
|
||||
|
||||
upload_id = req.params['uploadId']
|
||||
_check_upload_info(req, self.app, upload_id)
|
||||
_get_upload_info(req, self.app, upload_id)
|
||||
|
||||
req.container_name += MULTIUPLOAD_SUFFIX
|
||||
req.object_name = '%s/%s/%d' % (req.object_name, upload_id,
|
||||
@ -449,7 +481,7 @@ class UploadController(Controller):
|
||||
raise InvalidArgument('encoding-type', encoding_type, err_msg)
|
||||
|
||||
upload_id = req.params['uploadId']
|
||||
_check_upload_info(req, self.app, upload_id)
|
||||
_get_upload_info(req, self.app, upload_id)
|
||||
|
||||
maxparts = req.get_validated_param(
|
||||
'max-parts', DEFAULT_MAX_PARTS_LISTING,
|
||||
@ -536,7 +568,7 @@ class UploadController(Controller):
|
||||
Handles Abort Multipart Upload.
|
||||
"""
|
||||
upload_id = req.params['uploadId']
|
||||
_check_upload_info(req, self.app, upload_id)
|
||||
_get_upload_info(req, self.app, upload_id)
|
||||
|
||||
# First check to see if this multi-part upload was already
|
||||
# completed. Look in the primary container, if the object exists,
|
||||
@ -580,7 +612,8 @@ class UploadController(Controller):
|
||||
"""
|
||||
upload_id = req.params['uploadId']
|
||||
resp = _get_upload_info(req, self.app, upload_id)
|
||||
headers = {'Accept': 'application/json'}
|
||||
headers = {'Accept': 'application/json',
|
||||
sysmeta_header('object', 'upload-id'): upload_id}
|
||||
for key, val in resp.headers.items():
|
||||
_key = key.lower()
|
||||
if _key.startswith('x-amz-meta-'):
|
||||
@ -650,7 +683,15 @@ class UploadController(Controller):
|
||||
raise
|
||||
|
||||
s3_etag = '%s-%d' % (s3_etag_hasher.hexdigest(), len(manifest))
|
||||
headers[sysmeta_header('object', 'etag')] = s3_etag
|
||||
s3_etag_header = sysmeta_header('object', 'etag')
|
||||
if resp.sysmeta_headers.get(s3_etag_header) == s3_etag:
|
||||
# This header should only already be present if the upload marker
|
||||
# has been cleaned up and the current target uses the same
|
||||
# upload-id; assuming the segments to use haven't changed, the work
|
||||
# is already done
|
||||
return HTTPOk(body=_make_complete_body(req, s3_etag, False),
|
||||
content_type='application/xml')
|
||||
headers[s3_etag_header] = s3_etag
|
||||
# Leave base header value blank; SLO will populate
|
||||
c_etag = '; s3_etag=%s' % s3_etag
|
||||
headers[get_container_update_override_key('etag')] = c_etag
|
||||
@ -730,37 +771,13 @@ class UploadController(Controller):
|
||||
try:
|
||||
req.get_response(self.app, 'DELETE', container, obj)
|
||||
except NoSuchKey:
|
||||
# We know that this existed long enough for us to HEAD
|
||||
# The important thing is that we wrote out a tombstone to
|
||||
# make sure the marker got cleaned up. If it's already
|
||||
# gone (e.g., because of concurrent completes or a retried
|
||||
# complete), so much the better.
|
||||
pass
|
||||
|
||||
result_elem = Element('CompleteMultipartUploadResult')
|
||||
|
||||
# NOTE: boto with sig v4 appends port to HTTP_HOST value at
|
||||
# the request header when the port is non default value and it
|
||||
# makes req.host_url like as http://localhost:8080:8080/path
|
||||
# that obviously invalid. Probably it should be resolved at
|
||||
# swift.common.swob though, tentatively we are parsing and
|
||||
# reconstructing the correct host_url info here.
|
||||
# in detail, https://github.com/boto/boto/pull/3513
|
||||
parsed_url = urlparse(req.host_url)
|
||||
host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname)
|
||||
# Why are we doing our own port parsing? Because py3 decided
|
||||
# to start raising ValueErrors on access after parsing such
|
||||
# an invalid port
|
||||
netloc = parsed_url.netloc.split('@')[-1].split(']')[-1]
|
||||
if ':' in netloc:
|
||||
port = netloc.split(':', 2)[1]
|
||||
host_url += ':%s' % port
|
||||
|
||||
SubElement(result_elem, 'Location').text = host_url + req.path
|
||||
SubElement(result_elem, 'Bucket').text = req.container_name
|
||||
SubElement(result_elem, 'Key').text = req.object_name
|
||||
SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag
|
||||
resp.headers.pop('ETag', None)
|
||||
if yielded_anything:
|
||||
yield b'\n'
|
||||
yield tostring(result_elem,
|
||||
xml_declaration=not yielded_anything)
|
||||
yield _make_complete_body(req, s3_etag, yielded_anything)
|
||||
except ErrorResponse as err_resp:
|
||||
if yielded_anything:
|
||||
err_resp.xml_declaration = False
|
||||
|
@ -120,10 +120,6 @@ class S3Timestamp(utils.Timestamp):
|
||||
return self.isoformat.replace(
|
||||
'-', '').replace(':', '')[:-7] + 'Z'
|
||||
|
||||
@classmethod
|
||||
def now(cls):
|
||||
return cls(time.time())
|
||||
|
||||
|
||||
def mktime(timestamp_str, time_format='%Y-%m-%dT%H:%M:%S'):
|
||||
"""
|
||||
|
@ -15,6 +15,8 @@
|
||||
|
||||
import unittest
|
||||
import traceback
|
||||
from contextlib import contextmanager
|
||||
import logging
|
||||
import test.functional as tf
|
||||
from test.functional.s3api.s3_test_client import (
|
||||
Connection, get_boto3_conn, tear_down_s3)
|
||||
@ -33,6 +35,14 @@ class S3ApiBase(unittest.TestCase):
|
||||
super(S3ApiBase, self).__init__(method_name)
|
||||
self.method_name = method_name
|
||||
|
||||
@contextmanager
|
||||
def quiet_boto_logging(self):
|
||||
try:
|
||||
logging.getLogger('boto').setLevel(logging.INFO)
|
||||
yield
|
||||
finally:
|
||||
logging.getLogger('boto').setLevel(logging.DEBUG)
|
||||
|
||||
def setUp(self):
|
||||
if 's3api' not in tf.cluster_info:
|
||||
raise tf.SkipTest('s3api middleware is not enabled')
|
||||
|
@ -78,9 +78,9 @@ class TestS3ApiMultiUpload(S3ApiBase):
|
||||
def _upload_part(self, bucket, key, upload_id, content=None, part_num=1):
|
||||
query = 'partNumber=%s&uploadId=%s' % (part_num, upload_id)
|
||||
content = content if content else b'a' * self.min_segment_size
|
||||
status, headers, body = \
|
||||
self.conn.make_request('PUT', bucket, key, body=content,
|
||||
query=query)
|
||||
with self.quiet_boto_logging():
|
||||
status, headers, body = self.conn.make_request(
|
||||
'PUT', bucket, key, body=content, query=query)
|
||||
return status, headers, body
|
||||
|
||||
def _upload_part_copy(self, src_bucket, src_obj, dst_bucket, dst_key,
|
||||
@ -113,7 +113,7 @@ class TestS3ApiMultiUpload(S3ApiBase):
|
||||
bucket = 'bucket'
|
||||
keys = ['obj1', 'obj2', 'obj3']
|
||||
bad_content_md5 = base64.b64encode(b'a' * 16).strip().decode('ascii')
|
||||
headers = [None,
|
||||
headers = [{'Content-Type': 'foo/bar', 'x-amz-meta-baz': 'quux'},
|
||||
{'Content-MD5': bad_content_md5},
|
||||
{'Etag': 'nonsense'}]
|
||||
uploads = []
|
||||
@ -293,7 +293,7 @@ class TestS3ApiMultiUpload(S3ApiBase):
|
||||
self._complete_multi_upload(bucket, key, upload_id, xml)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertCommonResponseHeaders(headers)
|
||||
self.assertTrue('content-type' in headers)
|
||||
self.assertIn('content-type', headers)
|
||||
self.assertEqual(headers['content-type'], 'application/xml')
|
||||
if 'content-length' in headers:
|
||||
self.assertEqual(headers['content-length'], str(len(body)))
|
||||
@ -317,9 +317,46 @@ class TestS3ApiMultiUpload(S3ApiBase):
|
||||
self.assertEqual(etag, exp_etag)
|
||||
|
||||
exp_size = self.min_segment_size * len(etags)
|
||||
status, headers, body = \
|
||||
self.conn.make_request('HEAD', bucket, key)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(headers['content-length'], str(exp_size))
|
||||
self.assertEqual(headers['content-type'], 'foo/bar')
|
||||
self.assertEqual(headers['x-amz-meta-baz'], 'quux')
|
||||
|
||||
swift_etag = '"%s"' % md5(concatted_etags).hexdigest()
|
||||
# TODO: GET via swift api, check against swift_etag
|
||||
|
||||
# Should be safe to retry
|
||||
status, headers, body = \
|
||||
self._complete_multi_upload(bucket, key, upload_id, xml)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertCommonResponseHeaders(headers)
|
||||
self.assertIn('content-type', headers)
|
||||
self.assertEqual(headers['content-type'], 'application/xml')
|
||||
if 'content-length' in headers:
|
||||
self.assertEqual(headers['content-length'], str(len(body)))
|
||||
else:
|
||||
self.assertIn('transfer-encoding', headers)
|
||||
self.assertEqual(headers['transfer-encoding'], 'chunked')
|
||||
lines = body.split(b'\n')
|
||||
self.assertTrue(lines[0].startswith(b'<?xml'), body)
|
||||
self.assertTrue(lines[0].endswith(b'?>'), body)
|
||||
elem = fromstring(body, 'CompleteMultipartUploadResult')
|
||||
self.assertEqual(
|
||||
'%s/bucket/obj1' % tf.config['s3_storage_url'].rstrip('/'),
|
||||
elem.find('Location').text)
|
||||
self.assertEqual(elem.find('Bucket').text, bucket)
|
||||
self.assertEqual(elem.find('Key').text, key)
|
||||
self.assertEqual(elem.find('ETag').text, exp_etag)
|
||||
|
||||
status, headers, body = \
|
||||
self.conn.make_request('HEAD', bucket, key)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(headers['content-length'], str(exp_size))
|
||||
self.assertEqual(headers['content-type'], 'foo/bar')
|
||||
self.assertEqual(headers['x-amz-meta-baz'], 'quux')
|
||||
|
||||
# Upload Part Copy -- MU as source
|
||||
key, upload_id = uploads[1]
|
||||
status, headers, body, resp_etag = \
|
||||
|
@ -827,7 +827,7 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
|
||||
self.assertEqual(self.swift.calls, [
|
||||
# Bucket exists
|
||||
('HEAD', '/v1/AUTH_test/bucket'),
|
||||
# Segment container exists
|
||||
# Upload marker exists
|
||||
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
|
||||
# Create the SLO
|
||||
('PUT', '/v1/AUTH_test/bucket/object'
|
||||
@ -843,6 +843,123 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
|
||||
override_etag = '; s3_etag=%s' % S3_ETAG.strip('"')
|
||||
h = 'X-Object-Sysmeta-Container-Update-Override-Etag'
|
||||
self.assertEqual(headers.get(h), override_etag)
|
||||
self.assertEqual(headers.get('X-Object-Sysmeta-S3Api-Upload-Id'), 'X')
|
||||
|
||||
def test_object_multipart_upload_retry_complete(self):
|
||||
content_md5 = base64.b64encode(hashlib.md5(
|
||||
XML.encode('ascii')).digest())
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
recent_ts = S3Timestamp.now(delta=-1000000).internal # 10s ago
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
|
||||
swob.HTTPOk,
|
||||
{'x-object-meta-foo': 'bar',
|
||||
'content-type': 'baz/quux',
|
||||
'x-object-sysmeta-s3api-upload-id': 'X',
|
||||
'x-object-sysmeta-s3api-etag': S3_ETAG.strip('"'),
|
||||
'x-timestamp': recent_ts}, None)
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac',
|
||||
'Date': self.get_date_header(),
|
||||
'Content-MD5': content_md5, },
|
||||
body=XML)
|
||||
status, headers, body = self.call_s3api(req)
|
||||
elem = fromstring(body, 'CompleteMultipartUploadResult')
|
||||
self.assertNotIn('Etag', headers)
|
||||
self.assertEqual(elem.find('ETag').text, S3_ETAG)
|
||||
self.assertEqual(status.split()[0], '200')
|
||||
|
||||
self.assertEqual(self.swift.calls, [
|
||||
# Bucket exists
|
||||
('HEAD', '/v1/AUTH_test/bucket'),
|
||||
# Upload marker does not exist
|
||||
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
|
||||
# But the object does, and with the same upload ID
|
||||
('HEAD', '/v1/AUTH_test/bucket/object'),
|
||||
# So no PUT necessary
|
||||
])
|
||||
|
||||
def test_object_multipart_upload_retry_complete_etag_mismatch(self):
|
||||
content_md5 = base64.b64encode(hashlib.md5(
|
||||
XML.encode('ascii')).digest())
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
recent_ts = S3Timestamp.now(delta=-1000000).internal
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
|
||||
swob.HTTPOk,
|
||||
{'x-object-meta-foo': 'bar',
|
||||
'content-type': 'baz/quux',
|
||||
'x-object-sysmeta-s3api-upload-id': 'X',
|
||||
'x-object-sysmeta-s3api-etag': 'not-the-etag',
|
||||
'x-timestamp': recent_ts}, None)
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac',
|
||||
'Date': self.get_date_header(),
|
||||
'Content-MD5': content_md5, },
|
||||
body=XML)
|
||||
status, headers, body = self.call_s3api(req)
|
||||
elem = fromstring(body, 'CompleteMultipartUploadResult')
|
||||
self.assertNotIn('Etag', headers)
|
||||
self.assertEqual(elem.find('ETag').text, S3_ETAG)
|
||||
self.assertEqual(status.split()[0], '200')
|
||||
|
||||
self.assertEqual(self.swift.calls, [
|
||||
# Bucket exists
|
||||
('HEAD', '/v1/AUTH_test/bucket'),
|
||||
# Upload marker does not exist
|
||||
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
|
||||
# But the object does, and with the same upload ID
|
||||
('HEAD', '/v1/AUTH_test/bucket/object'),
|
||||
# Create the SLO
|
||||
('PUT', '/v1/AUTH_test/bucket/object'
|
||||
'?heartbeat=on&multipart-manifest=put'),
|
||||
# Retry deleting the marker for the sake of completeness
|
||||
('DELETE', '/v1/AUTH_test/bucket+segments/object/X')
|
||||
])
|
||||
|
||||
_, _, headers = self.swift.calls_with_headers[-2]
|
||||
self.assertEqual(headers.get('X-Object-Meta-Foo'), 'bar')
|
||||
self.assertEqual(headers.get('Content-Type'), 'baz/quux')
|
||||
# SLO will provide a base value
|
||||
override_etag = '; s3_etag=%s' % S3_ETAG.strip('"')
|
||||
h = 'X-Object-Sysmeta-Container-Update-Override-Etag'
|
||||
self.assertEqual(headers.get(h), override_etag)
|
||||
self.assertEqual(headers.get('X-Object-Sysmeta-S3Api-Upload-Id'), 'X')
|
||||
|
||||
def test_object_multipart_upload_retry_complete_upload_id_mismatch(self):
|
||||
content_md5 = base64.b64encode(hashlib.md5(
|
||||
XML.encode('ascii')).digest())
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
recent_ts = S3Timestamp.now(delta=-1000000).internal
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
|
||||
swob.HTTPOk,
|
||||
{'x-object-meta-foo': 'bar',
|
||||
'content-type': 'baz/quux',
|
||||
'x-object-sysmeta-s3api-upload-id': 'Y',
|
||||
'x-object-sysmeta-s3api-etag': S3_ETAG.strip('"'),
|
||||
'x-timestamp': recent_ts}, None)
|
||||
req = Request.blank('/bucket/object?uploadId=X',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac',
|
||||
'Date': self.get_date_header(),
|
||||
'Content-MD5': content_md5, },
|
||||
body=XML)
|
||||
status, headers, body = self.call_s3api(req)
|
||||
elem = fromstring(body, 'Error')
|
||||
self.assertEqual(elem.find('Code').text, 'NoSuchUpload')
|
||||
self.assertEqual(status.split()[0], '404')
|
||||
|
||||
self.assertEqual(self.swift.calls, [
|
||||
# Bucket exists
|
||||
('HEAD', '/v1/AUTH_test/bucket'),
|
||||
# Upload marker does not exist
|
||||
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
|
||||
# But the object does, and with the same upload ID
|
||||
('HEAD', '/v1/AUTH_test/bucket/object'),
|
||||
])
|
||||
|
||||
def test_object_multipart_upload_invalid_md5(self):
|
||||
bad_md5 = base64.b64encode(hashlib.md5(
|
||||
|
Loading…
x
Reference in New Issue
Block a user