Merge "Adds a retry mechanism when deleting containers"
This commit is contained in:
commit
c709cf90fc
@ -496,6 +496,14 @@ use = egg:swift#bulk
|
||||
# max_failed_deletes = 1000
|
||||
# yield_frequency = 60
|
||||
|
||||
# Note: The following parameter is used during a bulk delete of objects and
|
||||
# their container. This would frequently fail because it is very likely
|
||||
# that all replicated objects have not been deleted by the time the middleware got a
|
||||
# successful response. It can be configured the number of retries. And the
|
||||
# number of seconds to wait between each retry will be 1.5**retry
|
||||
|
||||
# delete_container_retry_count = 0
|
||||
|
||||
# Note: Put after auth in the pipeline.
|
||||
[filter:container-quotas]
|
||||
use = egg:swift#container_quotas
|
||||
|
@ -17,6 +17,7 @@ import tarfile
|
||||
from urllib import quote, unquote
|
||||
from xml.sax import saxutils
|
||||
from time import time
|
||||
from eventlet import sleep
|
||||
import zlib
|
||||
from swift.common.swob import Request, HTTPBadGateway, \
|
||||
HTTPCreated, HTTPBadRequest, HTTPNotFound, HTTPUnauthorized, HTTPOk, \
|
||||
@ -24,7 +25,7 @@ from swift.common.swob import Request, HTTPBadGateway, \
|
||||
HTTPLengthRequired, HTTPException, HTTPServerError, wsgify
|
||||
from swift.common.utils import json, get_logger, register_swift_info
|
||||
from swift.common.constraints import check_utf8, MAX_FILE_SIZE
|
||||
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
|
||||
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND, HTTP_CONFLICT
|
||||
from swift.common.constraints import MAX_OBJECT_NAME_LENGTH, \
|
||||
MAX_CONTAINER_NAME_LENGTH
|
||||
|
||||
@ -186,7 +187,8 @@ class Bulk(object):
|
||||
|
||||
def __init__(self, app, conf, max_containers_per_extraction=10000,
|
||||
max_failed_extractions=1000, max_deletes_per_request=10000,
|
||||
max_failed_deletes=1000, yield_frequency=60):
|
||||
max_failed_deletes=1000, yield_frequency=60, retry_count=0,
|
||||
retry_interval=1.5):
|
||||
self.app = app
|
||||
self.logger = get_logger(conf, log_route='bulk')
|
||||
self.max_containers = max_containers_per_extraction
|
||||
@ -194,6 +196,8 @@ class Bulk(object):
|
||||
self.max_failed_deletes = max_failed_deletes
|
||||
self.max_deletes_per_request = max_deletes_per_request
|
||||
self.yield_frequency = yield_frequency
|
||||
self.retry_count = retry_count
|
||||
self.retry_interval = retry_interval
|
||||
|
||||
def create_container(self, req, container_path):
|
||||
"""
|
||||
@ -302,7 +306,7 @@ class Bulk(object):
|
||||
|
||||
if objs_to_delete is None:
|
||||
objs_to_delete = self.get_objs_to_delete(req)
|
||||
failed_file_response_type = HTTPBadRequest
|
||||
failed_file_response = {'type': HTTPBadRequest}
|
||||
req.environ['eventlet.minimum_write_chunk_size'] = 0
|
||||
for obj_to_delete in objs_to_delete:
|
||||
if last_yield + self.yield_frequency < time():
|
||||
@ -334,23 +338,12 @@ class Bulk(object):
|
||||
new_env['HTTP_USER_AGENT'] = \
|
||||
'%s %s' % (req.environ.get('HTTP_USER_AGENT'), user_agent)
|
||||
new_env['swift.source'] = swift_source
|
||||
delete_obj_req = Request.blank(delete_path, new_env)
|
||||
resp = delete_obj_req.get_response(self.app)
|
||||
if resp.status_int // 100 == 2:
|
||||
resp_dict['Number Deleted'] += 1
|
||||
elif resp.status_int == HTTP_NOT_FOUND:
|
||||
resp_dict['Number Not Found'] += 1
|
||||
elif resp.status_int == HTTP_UNAUTHORIZED:
|
||||
failed_files.append([quote(obj_name),
|
||||
HTTPUnauthorized().status])
|
||||
else:
|
||||
if resp.status_int // 100 == 5:
|
||||
failed_file_response_type = HTTPBadGateway
|
||||
failed_files.append([quote(obj_name), resp.status])
|
||||
self._process_delete(delete_path, obj_name, new_env, resp_dict,
|
||||
failed_files, failed_file_response)
|
||||
|
||||
if failed_files:
|
||||
resp_dict['Response Status'] = \
|
||||
failed_file_response_type().status
|
||||
failed_file_response['type']().status
|
||||
elif not (resp_dict['Number Deleted'] or
|
||||
resp_dict['Number Not Found']):
|
||||
resp_dict['Response Status'] = HTTPBadRequest().status
|
||||
@ -509,6 +502,29 @@ class Bulk(object):
|
||||
yield separator + get_response_body(
|
||||
out_content_type, resp_dict, failed_files)
|
||||
|
||||
def _process_delete(self, delete_path, obj_name, env, resp_dict,
|
||||
failed_files, failed_file_response, retry=0):
|
||||
delete_obj_req = Request.blank(delete_path, env)
|
||||
resp = delete_obj_req.get_response(self.app)
|
||||
if resp.status_int // 100 == 2:
|
||||
resp_dict['Number Deleted'] += 1
|
||||
elif resp.status_int == HTTP_NOT_FOUND:
|
||||
resp_dict['Number Not Found'] += 1
|
||||
elif resp.status_int == HTTP_UNAUTHORIZED:
|
||||
failed_files.append([quote(obj_name),
|
||||
HTTPUnauthorized().status])
|
||||
elif resp.status_int == HTTP_CONFLICT and \
|
||||
self.retry_count > 0 and self.retry_count > retry:
|
||||
retry += 1
|
||||
sleep(self.retry_interval ** retry)
|
||||
self._process_delete(delete_path, obj_name, env, resp_dict,
|
||||
failed_files, failed_file_response,
|
||||
retry)
|
||||
else:
|
||||
if resp.status_int // 100 == 5:
|
||||
failed_file_response['type'] = HTTPBadGateway
|
||||
failed_files.append([quote(obj_name), resp.status])
|
||||
|
||||
@wsgify
|
||||
def __call__(self, req):
|
||||
extract_type = req.params.get('extract-archive')
|
||||
@ -547,6 +563,8 @@ def filter_factory(global_conf, **local_conf):
|
||||
max_deletes_per_request = int(conf.get('max_deletes_per_request', 10000))
|
||||
max_failed_deletes = int(conf.get('max_failed_deletes', 1000))
|
||||
yield_frequency = int(conf.get('yield_frequency', 60))
|
||||
retry_count = int(conf.get('delete_container_retry_count', 0))
|
||||
retry_interval = 1.5
|
||||
|
||||
register_swift_info(
|
||||
'bulk_upload',
|
||||
@ -564,5 +582,7 @@ def filter_factory(global_conf, **local_conf):
|
||||
max_failed_extractions=max_failed_extractions,
|
||||
max_deletes_per_request=max_deletes_per_request,
|
||||
max_failed_deletes=max_failed_deletes,
|
||||
yield_frequency=yield_frequency)
|
||||
yield_frequency=yield_frequency,
|
||||
retry_count=retry_count,
|
||||
retry_interval=retry_interval)
|
||||
return bulk_filter
|
||||
|
@ -19,10 +19,12 @@ import os
|
||||
import tarfile
|
||||
import urllib
|
||||
import zlib
|
||||
import mock
|
||||
from shutil import rmtree
|
||||
from tempfile import mkdtemp
|
||||
from StringIO import StringIO
|
||||
from mock import patch
|
||||
from eventlet import sleep
|
||||
from mock import patch, call
|
||||
from swift.common import utils
|
||||
from swift.common.middleware import bulk
|
||||
from swift.common.swob import Request, Response, HTTPException
|
||||
@ -35,6 +37,8 @@ class FakeApp(object):
|
||||
self.calls = 0
|
||||
self.delete_paths = []
|
||||
self.max_pathlen = 100
|
||||
self.del_cont_total_calls = 2
|
||||
self.del_cont_cur_call = 0
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
self.calls += 1
|
||||
@ -78,6 +82,12 @@ class FakeApp(object):
|
||||
return Response(status='409 Conflict')(env, start_response)
|
||||
if env['PATH_INFO'].startswith('/broke/'):
|
||||
return Response(status='500 Internal Error')(env, start_response)
|
||||
if env['PATH_INFO'].startswith('/delete_cont_success_after_attempts/'):
|
||||
if self.del_cont_cur_call < self.del_cont_total_calls:
|
||||
self.del_cont_cur_call += 1
|
||||
return Response(status='409 Conflict')(env, start_response)
|
||||
else:
|
||||
return Response(status='204 No Content')(env, start_response)
|
||||
|
||||
|
||||
def build_dir_tree(start_path, tree_obj):
|
||||
@ -695,11 +705,51 @@ class TestDelete(unittest.TestCase):
|
||||
req = Request.blank('/delete_cont_fail/AUTH_Acc', body='c\n',
|
||||
headers={'Accept': 'application/json'})
|
||||
req.method = 'POST'
|
||||
resp_body = self.handle_delete_and_iter(req)
|
||||
resp_data = json.loads(resp_body)
|
||||
self.assertEquals(resp_data['Number Deleted'], 0)
|
||||
self.assertEquals(resp_data['Errors'], [['c', '409 Conflict']])
|
||||
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
|
||||
with patch('swift.common.middleware.bulk.sleep',
|
||||
new=mock.MagicMock(wraps=sleep,
|
||||
return_value=None)) as mock_sleep:
|
||||
resp_body = self.handle_delete_and_iter(req)
|
||||
resp_data = json.loads(resp_body)
|
||||
self.assertEquals(resp_data['Number Deleted'], 0)
|
||||
self.assertEquals(resp_data['Errors'], [['c', '409 Conflict']])
|
||||
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
|
||||
self.assertEquals([], mock_sleep.call_args_list)
|
||||
|
||||
def test_bulk_delete_container_delete_retry_and_fails(self):
|
||||
self.bulk.retry_count = 3
|
||||
req = Request.blank('/delete_cont_fail/AUTH_Acc', body='c\n',
|
||||
headers={'Accept': 'application/json'})
|
||||
req.method = 'POST'
|
||||
with patch('swift.common.middleware.bulk.sleep',
|
||||
new=mock.MagicMock(wraps=sleep,
|
||||
return_value=None)) as mock_sleep:
|
||||
resp_body = self.handle_delete_and_iter(req)
|
||||
resp_data = json.loads(resp_body)
|
||||
self.assertEquals(resp_data['Number Deleted'], 0)
|
||||
self.assertEquals(resp_data['Errors'], [['c', '409 Conflict']])
|
||||
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
|
||||
self.assertEquals([call(self.bulk.retry_interval),
|
||||
call(self.bulk.retry_interval ** 2),
|
||||
call(self.bulk.retry_interval ** 3)],
|
||||
mock_sleep.call_args_list)
|
||||
|
||||
def test_bulk_delete_container_delete_retry_and_success(self):
|
||||
self.bulk.retry_count = 3
|
||||
self.app.del_container_total = 2
|
||||
req = Request.blank('/delete_cont_success_after_attempts/AUTH_Acc',
|
||||
body='c\n', headers={'Accept': 'application/json'})
|
||||
req.method = 'DELETE'
|
||||
with patch('swift.common.middleware.bulk.sleep',
|
||||
new=mock.MagicMock(wraps=sleep,
|
||||
return_value=None)) as mock_sleep:
|
||||
resp_body = self.handle_delete_and_iter(req)
|
||||
resp_data = json.loads(resp_body)
|
||||
self.assertEquals(resp_data['Number Deleted'], 1)
|
||||
self.assertEquals(resp_data['Errors'], [])
|
||||
self.assertEquals(resp_data['Response Status'], '200 OK')
|
||||
self.assertEquals([call(self.bulk.retry_interval),
|
||||
call(self.bulk.retry_interval ** 2)],
|
||||
mock_sleep.call_args_list)
|
||||
|
||||
def test_bulk_delete_bad_file_too_long(self):
|
||||
req = Request.blank('/delete_works/AUTH_Acc',
|
||||
|
Loading…
x
Reference in New Issue
Block a user