Adds a retry mechanism when deleting containers
Bulk middleware now has a mechanism to retry a delete when the HTTP response code is 409. This happens when the container still has objects. It can be useful in a bulk delete where you delete all the objects in a container and then try to delete the container. It is very likely that at the end it will fail because the replica objects have not been deleted by the time the middleware got a successful response. Change-Id: I1614fcb5cc511be26a9dda90753dd08ec9546a3c Closes-Bug: #1253478
This commit is contained in:
parent
ba5fe5f39e
commit
96c9ff56fa
@ -496,6 +496,14 @@ use = egg:swift#bulk
|
||||
# max_failed_deletes = 1000
|
||||
# yield_frequency = 60
|
||||
|
||||
# Note: The following parameter is used during a bulk delete of objects and
|
||||
# their container. This would frequently fail because it is very likely
|
||||
# that all replicated objects have not been deleted by the time the middleware got a
|
||||
# successful response. It can be configured the number of retries. And the
|
||||
# number of seconds to wait between each retry will be 1.5**retry
|
||||
|
||||
# delete_container_retry_count = 0
|
||||
|
||||
# Note: Put after auth in the pipeline.
|
||||
[filter:container-quotas]
|
||||
use = egg:swift#container_quotas
|
||||
|
@ -17,6 +17,7 @@ import tarfile
|
||||
from urllib import quote, unquote
|
||||
from xml.sax import saxutils
|
||||
from time import time
|
||||
from eventlet import sleep
|
||||
import zlib
|
||||
from swift.common.swob import Request, HTTPBadGateway, \
|
||||
HTTPCreated, HTTPBadRequest, HTTPNotFound, HTTPUnauthorized, HTTPOk, \
|
||||
@ -24,7 +25,7 @@ from swift.common.swob import Request, HTTPBadGateway, \
|
||||
HTTPLengthRequired, HTTPException, HTTPServerError, wsgify
|
||||
from swift.common.utils import json, get_logger, register_swift_info
|
||||
from swift.common.constraints import check_utf8, MAX_FILE_SIZE
|
||||
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
|
||||
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND, HTTP_CONFLICT
|
||||
from swift.common.constraints import MAX_OBJECT_NAME_LENGTH, \
|
||||
MAX_CONTAINER_NAME_LENGTH
|
||||
|
||||
@ -186,7 +187,8 @@ class Bulk(object):
|
||||
|
||||
def __init__(self, app, conf, max_containers_per_extraction=10000,
|
||||
max_failed_extractions=1000, max_deletes_per_request=10000,
|
||||
max_failed_deletes=1000, yield_frequency=60):
|
||||
max_failed_deletes=1000, yield_frequency=60, retry_count=0,
|
||||
retry_interval=1.5):
|
||||
self.app = app
|
||||
self.logger = get_logger(conf, log_route='bulk')
|
||||
self.max_containers = max_containers_per_extraction
|
||||
@ -194,6 +196,8 @@ class Bulk(object):
|
||||
self.max_failed_deletes = max_failed_deletes
|
||||
self.max_deletes_per_request = max_deletes_per_request
|
||||
self.yield_frequency = yield_frequency
|
||||
self.retry_count = retry_count
|
||||
self.retry_interval = retry_interval
|
||||
|
||||
def create_container(self, req, container_path):
|
||||
"""
|
||||
@ -302,7 +306,7 @@ class Bulk(object):
|
||||
|
||||
if objs_to_delete is None:
|
||||
objs_to_delete = self.get_objs_to_delete(req)
|
||||
failed_file_response_type = HTTPBadRequest
|
||||
failed_file_response = {'type': HTTPBadRequest}
|
||||
req.environ['eventlet.minimum_write_chunk_size'] = 0
|
||||
for obj_to_delete in objs_to_delete:
|
||||
if last_yield + self.yield_frequency < time():
|
||||
@ -334,23 +338,12 @@ class Bulk(object):
|
||||
new_env['HTTP_USER_AGENT'] = \
|
||||
'%s %s' % (req.environ.get('HTTP_USER_AGENT'), user_agent)
|
||||
new_env['swift.source'] = swift_source
|
||||
delete_obj_req = Request.blank(delete_path, new_env)
|
||||
resp = delete_obj_req.get_response(self.app)
|
||||
if resp.status_int // 100 == 2:
|
||||
resp_dict['Number Deleted'] += 1
|
||||
elif resp.status_int == HTTP_NOT_FOUND:
|
||||
resp_dict['Number Not Found'] += 1
|
||||
elif resp.status_int == HTTP_UNAUTHORIZED:
|
||||
failed_files.append([quote(obj_name),
|
||||
HTTPUnauthorized().status])
|
||||
else:
|
||||
if resp.status_int // 100 == 5:
|
||||
failed_file_response_type = HTTPBadGateway
|
||||
failed_files.append([quote(obj_name), resp.status])
|
||||
self._process_delete(delete_path, obj_name, new_env, resp_dict,
|
||||
failed_files, failed_file_response)
|
||||
|
||||
if failed_files:
|
||||
resp_dict['Response Status'] = \
|
||||
failed_file_response_type().status
|
||||
failed_file_response['type']().status
|
||||
elif not (resp_dict['Number Deleted'] or
|
||||
resp_dict['Number Not Found']):
|
||||
resp_dict['Response Status'] = HTTPBadRequest().status
|
||||
@ -509,6 +502,29 @@ class Bulk(object):
|
||||
yield separator + get_response_body(
|
||||
out_content_type, resp_dict, failed_files)
|
||||
|
||||
def _process_delete(self, delete_path, obj_name, env, resp_dict,
|
||||
failed_files, failed_file_response, retry=0):
|
||||
delete_obj_req = Request.blank(delete_path, env)
|
||||
resp = delete_obj_req.get_response(self.app)
|
||||
if resp.status_int // 100 == 2:
|
||||
resp_dict['Number Deleted'] += 1
|
||||
elif resp.status_int == HTTP_NOT_FOUND:
|
||||
resp_dict['Number Not Found'] += 1
|
||||
elif resp.status_int == HTTP_UNAUTHORIZED:
|
||||
failed_files.append([quote(obj_name),
|
||||
HTTPUnauthorized().status])
|
||||
elif resp.status_int == HTTP_CONFLICT and \
|
||||
self.retry_count > 0 and self.retry_count > retry:
|
||||
retry += 1
|
||||
sleep(self.retry_interval ** retry)
|
||||
self._process_delete(delete_path, obj_name, env, resp_dict,
|
||||
failed_files, failed_file_response,
|
||||
retry)
|
||||
else:
|
||||
if resp.status_int // 100 == 5:
|
||||
failed_file_response['type'] = HTTPBadGateway
|
||||
failed_files.append([quote(obj_name), resp.status])
|
||||
|
||||
@wsgify
|
||||
def __call__(self, req):
|
||||
extract_type = req.params.get('extract-archive')
|
||||
@ -547,6 +563,8 @@ def filter_factory(global_conf, **local_conf):
|
||||
max_deletes_per_request = int(conf.get('max_deletes_per_request', 10000))
|
||||
max_failed_deletes = int(conf.get('max_failed_deletes', 1000))
|
||||
yield_frequency = int(conf.get('yield_frequency', 60))
|
||||
retry_count = int(conf.get('delete_container_retry_count', 0))
|
||||
retry_interval = 1.5
|
||||
|
||||
register_swift_info(
|
||||
'bulk_upload',
|
||||
@ -564,5 +582,7 @@ def filter_factory(global_conf, **local_conf):
|
||||
max_failed_extractions=max_failed_extractions,
|
||||
max_deletes_per_request=max_deletes_per_request,
|
||||
max_failed_deletes=max_failed_deletes,
|
||||
yield_frequency=yield_frequency)
|
||||
yield_frequency=yield_frequency,
|
||||
retry_count=retry_count,
|
||||
retry_interval=retry_interval)
|
||||
return bulk_filter
|
||||
|
@ -19,10 +19,12 @@ import os
|
||||
import tarfile
|
||||
import urllib
|
||||
import zlib
|
||||
import mock
|
||||
from shutil import rmtree
|
||||
from tempfile import mkdtemp
|
||||
from StringIO import StringIO
|
||||
from mock import patch
|
||||
from eventlet import sleep
|
||||
from mock import patch, call
|
||||
from swift.common import utils
|
||||
from swift.common.middleware import bulk
|
||||
from swift.common.swob import Request, Response, HTTPException
|
||||
@ -35,6 +37,8 @@ class FakeApp(object):
|
||||
self.calls = 0
|
||||
self.delete_paths = []
|
||||
self.max_pathlen = 100
|
||||
self.del_cont_total_calls = 2
|
||||
self.del_cont_cur_call = 0
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
self.calls += 1
|
||||
@ -78,6 +82,12 @@ class FakeApp(object):
|
||||
return Response(status='409 Conflict')(env, start_response)
|
||||
if env['PATH_INFO'].startswith('/broke/'):
|
||||
return Response(status='500 Internal Error')(env, start_response)
|
||||
if env['PATH_INFO'].startswith('/delete_cont_success_after_attempts/'):
|
||||
if self.del_cont_cur_call < self.del_cont_total_calls:
|
||||
self.del_cont_cur_call += 1
|
||||
return Response(status='409 Conflict')(env, start_response)
|
||||
else:
|
||||
return Response(status='204 No Content')(env, start_response)
|
||||
|
||||
|
||||
def build_dir_tree(start_path, tree_obj):
|
||||
@ -695,11 +705,51 @@ class TestDelete(unittest.TestCase):
|
||||
req = Request.blank('/delete_cont_fail/AUTH_Acc', body='c\n',
|
||||
headers={'Accept': 'application/json'})
|
||||
req.method = 'POST'
|
||||
resp_body = self.handle_delete_and_iter(req)
|
||||
resp_data = json.loads(resp_body)
|
||||
self.assertEquals(resp_data['Number Deleted'], 0)
|
||||
self.assertEquals(resp_data['Errors'], [['c', '409 Conflict']])
|
||||
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
|
||||
with patch('swift.common.middleware.bulk.sleep',
|
||||
new=mock.MagicMock(wraps=sleep,
|
||||
return_value=None)) as mock_sleep:
|
||||
resp_body = self.handle_delete_and_iter(req)
|
||||
resp_data = json.loads(resp_body)
|
||||
self.assertEquals(resp_data['Number Deleted'], 0)
|
||||
self.assertEquals(resp_data['Errors'], [['c', '409 Conflict']])
|
||||
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
|
||||
self.assertEquals([], mock_sleep.call_args_list)
|
||||
|
||||
def test_bulk_delete_container_delete_retry_and_fails(self):
|
||||
self.bulk.retry_count = 3
|
||||
req = Request.blank('/delete_cont_fail/AUTH_Acc', body='c\n',
|
||||
headers={'Accept': 'application/json'})
|
||||
req.method = 'POST'
|
||||
with patch('swift.common.middleware.bulk.sleep',
|
||||
new=mock.MagicMock(wraps=sleep,
|
||||
return_value=None)) as mock_sleep:
|
||||
resp_body = self.handle_delete_and_iter(req)
|
||||
resp_data = json.loads(resp_body)
|
||||
self.assertEquals(resp_data['Number Deleted'], 0)
|
||||
self.assertEquals(resp_data['Errors'], [['c', '409 Conflict']])
|
||||
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
|
||||
self.assertEquals([call(self.bulk.retry_interval),
|
||||
call(self.bulk.retry_interval ** 2),
|
||||
call(self.bulk.retry_interval ** 3)],
|
||||
mock_sleep.call_args_list)
|
||||
|
||||
def test_bulk_delete_container_delete_retry_and_success(self):
|
||||
self.bulk.retry_count = 3
|
||||
self.app.del_container_total = 2
|
||||
req = Request.blank('/delete_cont_success_after_attempts/AUTH_Acc',
|
||||
body='c\n', headers={'Accept': 'application/json'})
|
||||
req.method = 'DELETE'
|
||||
with patch('swift.common.middleware.bulk.sleep',
|
||||
new=mock.MagicMock(wraps=sleep,
|
||||
return_value=None)) as mock_sleep:
|
||||
resp_body = self.handle_delete_and_iter(req)
|
||||
resp_data = json.loads(resp_body)
|
||||
self.assertEquals(resp_data['Number Deleted'], 1)
|
||||
self.assertEquals(resp_data['Errors'], [])
|
||||
self.assertEquals(resp_data['Response Status'], '200 OK')
|
||||
self.assertEquals([call(self.bulk.retry_interval),
|
||||
call(self.bulk.retry_interval ** 2)],
|
||||
mock_sleep.call_args_list)
|
||||
|
||||
def test_bulk_delete_bad_file_too_long(self):
|
||||
req = Request.blank('/delete_works/AUTH_Acc',
|
||||
|
Loading…
x
Reference in New Issue
Block a user