From e09c4ee7800e82aa09ca2f6ae375420b766182a4 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Fri, 29 Apr 2016 12:12:00 -0500 Subject: [PATCH] Allow concurrent bulk deletes Before, server-side deletes of static large objects could take a long time to complete since the proxy would wait for a response to each segment DELETE before starting the next DELETE request. Now, operators can configure a concurrency factor for the slo and bulk middlewares to allow up to N concurrent DELETE requests. By default, two DELETE requests will be allowed at a time. Note that objects and containers are now deleted in separate passes, to reduce the likelihood of 409 Conflict responses when deleting containers. Upgrade Consideration ===================== If operators have enabled the bulk or slo middlewares and would like to preserve the prior (single-threaded) DELETE behavior, they must add the following line to their [filter:slo] and [filter:bulk] proxy config sections: delete_concurrency = 1 This may be done prior to upgrading Swift. UpgradeImpact Closes-Bug: 1524454 Change-Id: I128374d74a4cef7a479b221fd15eec785cc4694a --- etc/proxy-server.conf-sample | 16 +++- swift/common/middleware/bulk.py | 114 +++++++++++++++------- swift/common/middleware/slo.py | 4 +- swift/common/utils.py | 42 ++++++++ test/unit/common/middleware/test_bulk.py | 116 +++++++++++++++++++---- test/unit/common/middleware/test_slo.py | 24 +++-- 6 files changed, 247 insertions(+), 69 deletions(-) diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index b5cfbf873b..6a4962ff9c 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -618,19 +618,23 @@ use = egg:swift#bulk # max_failed_extractions = 1000 # max_deletes_per_request = 10000 # max_failed_deletes = 1000 - +# # In order to keep a connection active during a potentially long bulk request, # Swift may return whitespace prepended to the actual response body. This # whitespace will be yielded no more than every yield_frequency seconds. # yield_frequency = 10 - +# # Note: The following parameter is used during a bulk delete of objects and # their container. This would frequently fail because it is very likely # that all replicated objects have not been deleted by the time the middleware got a # successful response. It can be configured the number of retries. And the # number of seconds to wait between each retry will be 1.5**retry - # delete_container_retry_count = 0 +# +# To speed up the bulk delete process, multiple deletes may be executed in +# parallel. Avoid setting this too high, as it gives clients a force multiplier +# which may be used in DoS attacks. The suggested range is between 2 and 10. +# delete_concurrency = 2 # Note: Put after auth and staticweb in the pipeline. [filter:slo] @@ -651,6 +655,12 @@ use = egg:swift#slo # # Time limit on GET requests (seconds) # max_get_time = 86400 +# +# When deleting with ?multipart-manifest=delete, multiple deletes may be +# executed in parallel. Avoid setting this too high, as it gives clients a +# force multiplier which may be used in DoS attacks. The suggested range is +# between 2 and 10. +# delete_concurrency = 2 # Note: Put after auth and staticweb in the pipeline. # If you don't put it in the pipeline, it will be inserted for you. diff --git a/swift/common/middleware/bulk.py b/swift/common/middleware/bulk.py index 0dd4aa12b2..3c394d2f8c 100644 --- a/swift/common/middleware/bulk.py +++ b/swift/common/middleware/bulk.py @@ -201,7 +201,8 @@ from swift.common.swob import Request, HTTPBadGateway, \ HTTPCreated, HTTPBadRequest, HTTPNotFound, HTTPUnauthorized, HTTPOk, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPNotAcceptable, \ HTTPLengthRequired, HTTPException, HTTPServerError, wsgify -from swift.common.utils import get_logger, register_swift_info +from swift.common.utils import get_logger, register_swift_info, \ + StreamingPile from swift.common import constraints from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND, HTTP_CONFLICT @@ -274,8 +275,9 @@ class Bulk(object): def __init__(self, app, conf, max_containers_per_extraction=10000, max_failed_extractions=1000, max_deletes_per_request=10000, - max_failed_deletes=1000, yield_frequency=10, retry_count=0, - retry_interval=1.5, logger=None): + max_failed_deletes=1000, yield_frequency=10, + delete_concurrency=2, retry_count=0, retry_interval=1.5, + logger=None): self.app = app self.logger = logger or get_logger(conf, log_route='bulk') self.max_containers = max_containers_per_extraction @@ -283,6 +285,7 @@ class Bulk(object): self.max_failed_deletes = max_failed_deletes self.max_deletes_per_request = max_deletes_per_request self.yield_frequency = yield_frequency + self.delete_concurrency = min(1000, max(1, delete_concurrency)) self.retry_count = retry_count self.retry_interval = retry_interval self.max_path_length = constraints.MAX_OBJECT_NAME_LENGTH \ @@ -397,39 +400,74 @@ class Bulk(object): objs_to_delete = self.get_objs_to_delete(req) failed_file_response = {'type': HTTPBadRequest} req.environ['eventlet.minimum_write_chunk_size'] = 0 - for obj_to_delete in objs_to_delete: - if last_yield + self.yield_frequency < time(): - separator = '\r\n\r\n' - last_yield = time() - yield ' ' - obj_name = obj_to_delete['name'] - if not obj_name: - continue - if len(failed_files) >= self.max_failed_deletes: - raise HTTPBadRequest('Max delete failures exceeded') - if obj_to_delete.get('error'): - if obj_to_delete['error']['code'] == HTTP_NOT_FOUND: - resp_dict['Number Not Found'] += 1 - else: + + def delete_filter(predicate, objs_to_delete): + for obj_to_delete in objs_to_delete: + obj_name = obj_to_delete['name'] + if not obj_name: + continue + if not predicate(obj_name): + continue + if obj_to_delete.get('error'): + if obj_to_delete['error']['code'] == HTTP_NOT_FOUND: + resp_dict['Number Not Found'] += 1 + else: + failed_files.append([ + quote(obj_name), + obj_to_delete['error']['message']]) + continue + delete_path = '/'.join(['', vrs, account, + obj_name.lstrip('/')]) + if not constraints.check_utf8(delete_path): failed_files.append([quote(obj_name), - obj_to_delete['error']['message']]) - continue - delete_path = '/'.join(['', vrs, account, - obj_name.lstrip('/')]) - if not constraints.check_utf8(delete_path): - failed_files.append([quote(obj_name), - HTTPPreconditionFailed().status]) - continue + HTTPPreconditionFailed().status]) + continue + yield (obj_name, delete_path) + + def objs_then_containers(objs_to_delete): + # process all objects first + yield delete_filter(lambda name: '/' in name.strip('/'), + objs_to_delete) + # followed by containers + yield delete_filter(lambda name: '/' not in name.strip('/'), + objs_to_delete) + + def do_delete(obj_name, delete_path): new_env = req.environ.copy() new_env['PATH_INFO'] = delete_path del(new_env['wsgi.input']) new_env['CONTENT_LENGTH'] = 0 new_env['REQUEST_METHOD'] = 'DELETE' - new_env['HTTP_USER_AGENT'] = \ - '%s %s' % (req.environ.get('HTTP_USER_AGENT'), user_agent) + new_env['HTTP_USER_AGENT'] = '%s %s' % ( + req.environ.get('HTTP_USER_AGENT'), user_agent) new_env['swift.source'] = swift_source - self._process_delete(delete_path, obj_name, new_env, resp_dict, - failed_files, failed_file_response) + delete_obj_req = Request.blank(delete_path, new_env) + return (delete_obj_req.get_response(self.app), obj_name, 0) + + with StreamingPile(self.delete_concurrency) as pile: + for names_to_delete in objs_then_containers(objs_to_delete): + for resp, obj_name, retry in pile.asyncstarmap( + do_delete, names_to_delete): + if last_yield + self.yield_frequency < time(): + separator = '\r\n\r\n' + last_yield = time() + yield ' ' + self._process_delete(resp, pile, obj_name, + resp_dict, failed_files, + failed_file_response, retry) + if len(failed_files) >= self.max_failed_deletes: + # Abort, but drain off the in-progress deletes + for resp, obj_name, retry in pile: + if last_yield + self.yield_frequency < time(): + separator = '\r\n\r\n' + last_yield = time() + yield ' ' + # Don't pass in the pile, as we shouldn't retry + self._process_delete( + resp, None, obj_name, resp_dict, + failed_files, failed_file_response, retry) + msg = 'Max delete failures exceeded' + raise HTTPBadRequest(msg) if failed_files: resp_dict['Response Status'] = \ @@ -603,10 +641,8 @@ class Bulk(object): yield separator + get_response_body( out_content_type, resp_dict, failed_files) - def _process_delete(self, delete_path, obj_name, env, resp_dict, + def _process_delete(self, resp, pile, obj_name, resp_dict, failed_files, failed_file_response, retry=0): - delete_obj_req = Request.blank(delete_path, env) - resp = delete_obj_req.get_response(self.app) if resp.status_int // 100 == 2: resp_dict['Number Deleted'] += 1 elif resp.status_int == HTTP_NOT_FOUND: @@ -614,13 +650,16 @@ class Bulk(object): elif resp.status_int == HTTP_UNAUTHORIZED: failed_files.append([quote(obj_name), HTTPUnauthorized().status]) - elif resp.status_int == HTTP_CONFLICT and \ + elif resp.status_int == HTTP_CONFLICT and pile and \ self.retry_count > 0 and self.retry_count > retry: retry += 1 sleep(self.retry_interval ** retry) - self._process_delete(delete_path, obj_name, env, resp_dict, - failed_files, failed_file_response, - retry) + delete_obj_req = Request.blank(resp.environ['PATH_INFO'], + resp.environ) + + def _retry(req, app, obj_name, retry): + return req.get_response(app), obj_name, retry + pile.spawn(_retry, delete_obj_req, self.app, obj_name, retry) else: if resp.status_int // 100 == 5: failed_file_response['type'] = HTTPBadGateway @@ -664,6 +703,8 @@ def filter_factory(global_conf, **local_conf): max_deletes_per_request = int(conf.get('max_deletes_per_request', 10000)) max_failed_deletes = int(conf.get('max_failed_deletes', 1000)) yield_frequency = int(conf.get('yield_frequency', 10)) + delete_concurrency = min(1000, max(1, int( + conf.get('delete_concurrency', 2)))) retry_count = int(conf.get('delete_container_retry_count', 0)) retry_interval = 1.5 @@ -684,6 +725,7 @@ def filter_factory(global_conf, **local_conf): max_deletes_per_request=max_deletes_per_request, max_failed_deletes=max_failed_deletes, yield_frequency=yield_frequency, + delete_concurrency=delete_concurrency, retry_count=retry_count, retry_interval=retry_interval) return bulk_filter diff --git a/swift/common/middleware/slo.py b/swift/common/middleware/slo.py index b87c8f2984..88efce6050 100644 --- a/swift/common/middleware/slo.py +++ b/swift/common/middleware/slo.py @@ -784,7 +784,9 @@ class StaticLargeObject(object): 'rate_limit_after_segment', '10')) self.rate_limit_segments_per_sec = int(self.conf.get( 'rate_limit_segments_per_sec', '1')) - self.bulk_deleter = Bulk(app, {}, logger=self.logger) + delete_concurrency = int(self.conf.get('delete_concurrency', '2')) + self.bulk_deleter = Bulk( + app, {}, delete_concurrency=delete_concurrency, logger=self.logger) def handle_multipart_get_or_head(self, req, start_response): """ diff --git a/swift/common/utils.py b/swift/common/utils.py index a33df51ed8..022c5128d1 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2603,6 +2603,48 @@ class GreenAsyncPile(object): __next__ = next +class StreamingPile(GreenAsyncPile): + """ + Runs jobs in a pool of green threads, spawning more jobs as results are + retrieved and worker threads become available. + + When used as a context manager, has the same worker-killing properties as + :class:`ContextPool`. + """ + def __init__(self, size): + """:param size: number of worker threads to use""" + self.pool = ContextPool(size) + super(StreamingPile, self).__init__(self.pool) + + def asyncstarmap(self, func, args_iter): + """ + This is the same as :func:`itertools.starmap`, except that *func* is + executed in a separate green thread for each item, and results won't + necessarily have the same order as inputs. + """ + args_iter = iter(args_iter) + + # Initialize the pile + for args in itertools.islice(args_iter, self.pool.size): + self.spawn(func, *args) + + # Keep populating the pile as greenthreads become available + for args in args_iter: + yield next(self) + self.spawn(func, *args) + + # Drain the pile + for result in self: + yield result + + def __enter__(self): + self.pool.__enter__() + return self + + def __exit__(self, type, value, traceback): + self.pool.__exit__(type, value, traceback) + + class ModifiedParseResult(ParseResult): "Parse results class for urlparse." diff --git a/test/unit/common/middleware/test_bulk.py b/test/unit/common/middleware/test_bulk.py index 1888261629..1439b0bd2e 100644 --- a/test/unit/common/middleware/test_bulk.py +++ b/test/unit/common/middleware/test_bulk.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import Counter import numbers from six.moves import urllib import unittest @@ -611,10 +612,11 @@ class TestUntar(unittest.TestCase): class TestDelete(unittest.TestCase): + conf = {'delete_concurrency': 1} # default to old single-threaded behavior def setUp(self): self.app = FakeApp() - self.bulk = bulk.filter_factory({})(self.app) + self.bulk = bulk.filter_factory(self.conf)(self.app) def tearDown(self): self.app.calls = 0 @@ -729,10 +731,10 @@ class TestDelete(unittest.TestCase): req.method = 'POST' resp_body = self.handle_delete_and_iter(req) self.assertEqual( - self.app.delete_paths, - ['/delete_works/AUTH_Acc/c/f', - '/delete_works/AUTH_Acc/c/f404', - '/delete_works/AUTH_Acc/c/%25']) + Counter(self.app.delete_paths), + Counter(['/delete_works/AUTH_Acc/c/f', + '/delete_works/AUTH_Acc/c/f404', + '/delete_works/AUTH_Acc/c/%25'])) self.assertEqual(self.app.calls, 3) resp_data = utils.json.loads(resp_body) self.assertEqual(resp_data['Number Deleted'], 2) @@ -756,19 +758,20 @@ class TestDelete(unittest.TestCase): req.method = 'POST' resp_body = self.handle_delete_and_iter(req) self.assertEqual( - self.app.delete_paths, - ['/delete_works/AUTH_Acc/c/ obj \xe2\x99\xa1', - '/delete_works/AUTH_Acc/c/ objbadutf8']) + Counter(self.app.delete_paths), + Counter(['/delete_works/AUTH_Acc/c/ obj \xe2\x99\xa1', + '/delete_works/AUTH_Acc/c/ objbadutf8'])) self.assertEqual(self.app.calls, 2) resp_data = utils.json.loads(resp_body) self.assertEqual(resp_data['Number Deleted'], 1) self.assertEqual(len(resp_data['Errors']), 2) - self.assertEqual(resp_data['Errors'], - [[urllib.parse.quote('c/ objbadutf8'), - '412 Precondition Failed'], - [urllib.parse.quote('/c/f\xdebadutf8'), - '412 Precondition Failed']]) + self.assertEqual( + Counter(map(tuple, resp_data['Errors'])), + Counter([(urllib.parse.quote('c/ objbadutf8'), + '412 Precondition Failed'), + (urllib.parse.quote('/c/f\xdebadutf8'), + '412 Precondition Failed')])) def test_bulk_delete_no_body(self): req = Request.blank('/unauth/AUTH_acc/') @@ -798,8 +801,9 @@ class TestDelete(unittest.TestCase): resp_body = self.handle_delete_and_iter(req) resp_data = utils.json.loads(resp_body) self.assertEqual( - resp_data['Errors'], - [['/c/f', '500 Internal Error'], ['c/f2', '500 Internal Error']]) + Counter(map(tuple, resp_data['Errors'])), + Counter([('/c/f', '500 Internal Error'), + ('c/f2', '500 Internal Error')])) self.assertEqual(resp_data['Response Status'], '502 Bad Gateway') def test_bulk_delete_bad_path(self): @@ -879,19 +883,91 @@ class TestDelete(unittest.TestCase): self.assertTrue('400 Bad Request' in resp_body) def test_bulk_delete_max_failures(self): - req = Request.blank('/unauth/AUTH_Acc', body='/c/f1\n/c/f2\n/c/f3', + body = '\n'.join([ + '/c/f1', '/c/f2', '/c/f3', '/c/f4', '/c/f5', '/c/f6', + ]) + req = Request.blank('/unauth/AUTH_Acc', body=body, headers={'Accept': 'application/json'}) req.method = 'POST' with patch.object(self.bulk, 'max_failed_deletes', 2): resp_body = self.handle_delete_and_iter(req) - self.assertEqual(self.app.calls, 2) + # We know there should be at least max_failed_deletes, but there + # may be more as we clean up in-progress requests. + self.assertGreaterEqual(self.app.calls, + self.bulk.max_failed_deletes) + # As we're pulling things off the pile, we: + # - get delete result, + # - process the result, + # - check max_failed_deletes, + # - spawn another delete, repeat. + # As a result, we know our app calls should be *strictly* less. + # Note this means that when delete_concurrency is one, + # self.app.calls will exactly equal self.bulk.max_failed_deletes. + self.assertLess(self.app.calls, + self.bulk.max_failed_deletes + + self.bulk.delete_concurrency) resp_data = utils.json.loads(resp_body) self.assertEqual(resp_data['Response Status'], '400 Bad Request') self.assertEqual(resp_data['Response Body'], 'Max delete failures exceeded') - self.assertEqual(resp_data['Errors'], - [['/c/f1', '401 Unauthorized'], - ['/c/f2', '401 Unauthorized']]) + self.assertIn(['/c/f1', '401 Unauthorized'], resp_data['Errors']) + self.assertIn(['/c/f2', '401 Unauthorized'], resp_data['Errors']) + + +class TestConcurrentDelete(TestDelete): + conf = {'delete_concurrency': 3} + + def test_concurrency_set(self): + self.assertEqual(self.bulk.delete_concurrency, 3) + + +class TestConfig(unittest.TestCase): + def test_defaults(self): + expected_defaults = { + 'delete_concurrency': 2, + 'max_containers': 10000, + 'max_deletes_per_request': 10000, + 'max_failed_deletes': 1000, + 'max_failed_extractions': 1000, + 'retry_count': 0, + 'retry_interval': 1.5, + 'yield_frequency': 10, + } + + filter_app = bulk.filter_factory({})(FakeApp()) + self.assertEqual(expected_defaults, {k: getattr(filter_app, k) + for k in expected_defaults}) + + filter_app = bulk.Bulk(FakeApp(), None) + self.assertEqual(expected_defaults, {k: getattr(filter_app, k) + for k in expected_defaults}) + + def test_delete_concurrency(self): + # Must be an integer + conf = {'delete_concurrency': '1.5'} + self.assertRaises(ValueError, bulk.filter_factory, conf) + + conf = {'delete_concurrency': 'asdf'} + self.assertRaises(ValueError, bulk.filter_factory, conf) + + # Will be at least one + conf = {'delete_concurrency': '-1'} + filter_app = bulk.filter_factory(conf)(FakeApp()) + self.assertEqual(1, filter_app.delete_concurrency) + + conf = {'delete_concurrency': '0'} + filter_app = bulk.filter_factory(conf)(FakeApp()) + self.assertEqual(1, filter_app.delete_concurrency) + + # But if you want to set it stupid-high, we won't stop you + conf = {'delete_concurrency': '1000'} + filter_app = bulk.filter_factory(conf)(FakeApp()) + self.assertEqual(1000, filter_app.delete_concurrency) + + # ...unless it's extra-stupid-high, in which case we cap it + conf = {'delete_concurrency': '1001'} + filter_app = bulk.filter_factory(conf)(FakeApp()) + self.assertEqual(1000, filter_app.delete_concurrency) class TestSwiftInfo(unittest.TestCase): diff --git a/test/unit/common/middleware/test_slo.py b/test/unit/common/middleware/test_slo.py index 79eaddcbf3..b87edf8b5e 100644 --- a/test/unit/common/middleware/test_slo.py +++ b/test/unit/common/middleware/test_slo.py @@ -917,15 +917,17 @@ class TestSloDeleteManifest(SloTestCase): status, headers, body = self.call_slo(req) resp_data = json.loads(body) self.assertEqual( - self.app.calls, - [('GET', '/v1/AUTH_test/deltest/' + - 'manifest-missing-submanifest?multipart-manifest=get'), - ('DELETE', '/v1/AUTH_test/deltest/a_1?multipart-manifest=delete'), - ('GET', '/v1/AUTH_test/deltest/' + - 'missing-submanifest?multipart-manifest=get'), - ('DELETE', '/v1/AUTH_test/deltest/d_3?multipart-manifest=delete'), - ('DELETE', '/v1/AUTH_test/deltest/' + - 'manifest-missing-submanifest?multipart-manifest=delete')]) + set(self.app.calls), + set([('GET', '/v1/AUTH_test/deltest/' + + 'manifest-missing-submanifest?multipart-manifest=get'), + ('DELETE', '/v1/AUTH_test/deltest/' + + 'a_1?multipart-manifest=delete'), + ('GET', '/v1/AUTH_test/deltest/' + + 'missing-submanifest?multipart-manifest=get'), + ('DELETE', '/v1/AUTH_test/deltest/' + + 'd_3?multipart-manifest=delete'), + ('DELETE', '/v1/AUTH_test/deltest/' + + 'manifest-missing-submanifest?multipart-manifest=delete')])) self.assertEqual(resp_data['Response Status'], '200 OK') self.assertEqual(resp_data['Response Body'], '') self.assertEqual(resp_data['Number Deleted'], 3) @@ -2652,6 +2654,10 @@ class TestSloBulkLogger(unittest.TestCase): slo_mware = slo.filter_factory({})('fake app') self.assertTrue(slo_mware.logger is slo_mware.bulk_deleter.logger) + def test_passes_through_concurrency(self): + slo_mware = slo.filter_factory({'delete_concurrency': 5})('fake app') + self.assertEqual(5, slo_mware.bulk_deleter.delete_concurrency) + class TestSwiftInfo(unittest.TestCase): def setUp(self):