Merge "s3api: Allow concurrent multi-deletes"
This commit is contained in:
commit
3043c54f28
@ -504,6 +504,10 @@ use = egg:swift#s3api
|
||||
# operation.
|
||||
# max_multi_delete_objects = 1000
|
||||
#
|
||||
# Set the number of objects to delete at a time with the Multi-Object Delete
|
||||
# operation.
|
||||
# multi_delete_concurrency = 2
|
||||
#
|
||||
# If set to 'true', s3api uses its own metadata for ACLs
|
||||
# (e.g. X-Container-Sysmeta-S3Api-Acl) to achieve the best S3 compatibility.
|
||||
# If set to 'false', s3api tries to use Swift ACLs (e.g. X-Container-Read)
|
||||
|
@ -13,8 +13,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from swift.common.constraints import MAX_OBJECT_NAME_LENGTH
|
||||
from swift.common.utils import public
|
||||
from swift.common.utils import public, StreamingPile
|
||||
|
||||
from swift.common.middleware.s3api.controllers.base import Controller, \
|
||||
bucket_operation
|
||||
@ -102,11 +104,13 @@ class MultiObjectDeleteController(Controller):
|
||||
body = self._gen_error_body(error, elem, delete_list)
|
||||
return HTTPOk(body=body)
|
||||
|
||||
for key, version in delete_list:
|
||||
if version is not None:
|
||||
# TODO: delete the specific version of the object
|
||||
raise S3NotImplemented()
|
||||
if any(version is not None for _key, version in delete_list):
|
||||
# TODO: support deleting specific versions of objects
|
||||
raise S3NotImplemented()
|
||||
|
||||
def do_delete(base_req, key, version):
|
||||
req = copy.copy(base_req)
|
||||
req.environ = copy.copy(base_req.environ)
|
||||
req.object_name = key
|
||||
|
||||
try:
|
||||
@ -115,15 +119,20 @@ class MultiObjectDeleteController(Controller):
|
||||
except NoSuchKey:
|
||||
pass
|
||||
except ErrorResponse as e:
|
||||
error = SubElement(elem, 'Error')
|
||||
SubElement(error, 'Key').text = key
|
||||
SubElement(error, 'Code').text = e.__class__.__name__
|
||||
SubElement(error, 'Message').text = e._msg
|
||||
continue
|
||||
return key, {'code': e.__class__.__name__, 'message': e._msg}
|
||||
return key, None
|
||||
|
||||
if not self.quiet:
|
||||
deleted = SubElement(elem, 'Deleted')
|
||||
SubElement(deleted, 'Key').text = key
|
||||
with StreamingPile(self.conf.multi_delete_concurrency) as pile:
|
||||
for key, err in pile.asyncstarmap(do_delete, (
|
||||
(req, key, version) for key, version in delete_list)):
|
||||
if err:
|
||||
error = SubElement(elem, 'Error')
|
||||
SubElement(error, 'Key').text = key
|
||||
SubElement(error, 'Code').text = err['code']
|
||||
SubElement(error, 'Message').text = err['message']
|
||||
elif not self.quiet:
|
||||
deleted = SubElement(elem, 'Deleted')
|
||||
SubElement(deleted, 'Key').text = key
|
||||
|
||||
body = tostring(elem)
|
||||
|
||||
|
@ -195,6 +195,8 @@ class S3ApiMiddleware(object):
|
||||
conf.get('max_parts_listing', 1000))
|
||||
self.conf.max_multi_delete_objects = config_positive_int_value(
|
||||
conf.get('max_multi_delete_objects', 1000))
|
||||
self.conf.multi_delete_concurrency = config_positive_int_value(
|
||||
conf.get('multi_delete_concurrency', 2))
|
||||
self.conf.s3_acl = config_true_value(
|
||||
conf.get('s3_acl', False))
|
||||
self.conf.storage_domain = conf.get('storage_domain', '')
|
||||
|
@ -33,9 +33,6 @@ def tearDownModule():
|
||||
|
||||
|
||||
class TestS3ApiMultiDelete(S3ApiBase):
|
||||
def setUp(self):
|
||||
super(TestS3ApiMultiDelete, self).setUp()
|
||||
|
||||
def _prepare_test_delete_multi_objects(self, bucket, objects):
|
||||
self.conn.make_request('PUT', bucket)
|
||||
for obj in objects:
|
||||
|
@ -95,6 +95,43 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
|
||||
('DELETE', '/v1/AUTH_test/bucket/Key3?multipart-manifest=delete'),
|
||||
])
|
||||
|
||||
@s3acl
|
||||
def test_object_multi_DELETE_with_error(self):
|
||||
self.swift.register('HEAD', '/v1/AUTH_test/bucket/Key3',
|
||||
swob.HTTPForbidden, {}, None)
|
||||
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key1',
|
||||
swob.HTTPNoContent, {}, None)
|
||||
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key2',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
|
||||
elem = Element('Delete')
|
||||
for key in ['Key1', 'Key2', 'Key3']:
|
||||
obj = SubElement(elem, 'Object')
|
||||
SubElement(obj, 'Key').text = key
|
||||
body = tostring(elem, use_s3ns=False)
|
||||
content_md5 = md5(body).digest().encode('base64').strip()
|
||||
|
||||
req = Request.blank('/bucket?delete',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac',
|
||||
'Content-Type': 'multipart/form-data',
|
||||
'Date': self.get_date_header(),
|
||||
'Content-MD5': content_md5},
|
||||
body=body)
|
||||
status, headers, body = self.call_s3api(req)
|
||||
self.assertEqual(status.split()[0], '200')
|
||||
|
||||
elem = fromstring(body)
|
||||
self.assertEqual(len(elem.findall('Deleted')), 2)
|
||||
self.assertEqual(len(elem.findall('Error')), 1)
|
||||
self.assertEqual(self.swift.calls, [
|
||||
('HEAD', '/v1/AUTH_test/bucket'),
|
||||
('HEAD', '/v1/AUTH_test/bucket/Key1'),
|
||||
('DELETE', '/v1/AUTH_test/bucket/Key1'),
|
||||
('HEAD', '/v1/AUTH_test/bucket/Key2'),
|
||||
('HEAD', '/v1/AUTH_test/bucket/Key3'),
|
||||
])
|
||||
|
||||
@s3acl
|
||||
def test_object_multi_DELETE_quiet(self):
|
||||
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key1',
|
||||
@ -146,6 +183,31 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
|
||||
status, headers, body = self.call_s3api(req)
|
||||
self.assertEqual(self._get_error_code(body), 'UserKeyMustBeSpecified')
|
||||
|
||||
@s3acl
|
||||
def test_object_multi_DELETE_versioned(self):
|
||||
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key1',
|
||||
swob.HTTPNoContent, {}, None)
|
||||
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key2',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
|
||||
elem = Element('Delete')
|
||||
SubElement(elem, 'Quiet').text = 'true'
|
||||
for key in ['Key1', 'Key2']:
|
||||
obj = SubElement(elem, 'Object')
|
||||
SubElement(obj, 'Key').text = key
|
||||
SubElement(obj, 'VersionId').text = 'not-supported'
|
||||
body = tostring(elem, use_s3ns=False)
|
||||
content_md5 = md5(body).digest().encode('base64').strip()
|
||||
|
||||
req = Request.blank('/bucket?delete',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Authorization': 'AWS test:tester:hmac',
|
||||
'Date': self.get_date_header(),
|
||||
'Content-MD5': content_md5},
|
||||
body=body)
|
||||
status, headers, body = self.call_s3api(req)
|
||||
self.assertEqual(self._get_error_code(body), 'NotImplemented')
|
||||
|
||||
@s3acl
|
||||
def test_object_multi_DELETE_with_invalid_md5(self):
|
||||
elem = Element('Delete')
|
||||
|
Loading…
Reference in New Issue
Block a user