From 83d01619910c34f2e3fafeda20bc4f7e7e8f482f Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Wed, 27 Mar 2019 15:28:50 -0700 Subject: [PATCH] Add operator tool to async-delete some or all objects in a container Adds a tool, swift-container-deleter, that takes an account/container and optional prefix, marker, and/or end-marker; spins up an internal client; makes listing requests against the container; and pushes the found objects into the object-expirer queue with a special application/async-deleted content-type. In order to do this enqueuing efficiently, a new internal-to-the-cluster container method is introduced: UPDATE. It takes a JSON list of object entries and runs them through merge_items. The object-expirer is updated to look for work items with this content-type and skip the X-If-Deleted-At check that it would normally do. Note that the target-container's listing will continue to show the objects until data is actually deleted, bypassing some of the concerns raised in the related change about clearing out a container entirely and then deleting it. Change-Id: Ia13ee5da3d1b5c536eccaadc7a6fdcd997374443 Related-Change: I50e403dee75585fc1ff2bb385d6b2d2f13653cf8 --- setup.cfg | 1 + swift/cli/container_deleter.py | 174 +++++++++++++++ swift/container/server.py | 26 +++ swift/obj/expirer.py | 75 +++++-- swift/obj/server.py | 3 +- swift/proxy/controllers/base.py | 15 +- swift/proxy/controllers/container.py | 20 ++ swift/proxy/server.py | 6 +- test/unit/cli/test_container_deleter.py | 277 ++++++++++++++++++++++++ test/unit/container/test_server.py | 117 +++++++++- test/unit/obj/test_expirer.py | 85 ++++++-- test/unit/proxy/test_server.py | 23 +- 12 files changed, 771 insertions(+), 51 deletions(-) create mode 100644 swift/cli/container_deleter.py create mode 100644 test/unit/cli/test_container_deleter.py diff --git a/setup.cfg b/setup.cfg index ed9fb89fce..55ebab232a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -78,6 +78,7 @@ keystone = [entry_points] console_scripts = swift-manage-shard-ranges = swift.cli.manage_shard_ranges:main + swift-container-deleter = swift.cli.container_deleter:main paste.app_factory = proxy = swift.proxy.server:app_factory diff --git a/swift/cli/container_deleter.py b/swift/cli/container_deleter.py new file mode 100644 index 0000000000..62b65c6dd0 --- /dev/null +++ b/swift/cli/container_deleter.py @@ -0,0 +1,174 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +''' +Enqueue background jobs to delete portions of a container's namespace. + +Accepts prefix, marker, and end-marker args that work as in container +listings. Objects found in the listing will be marked to be deleted +by the object-expirer; until the object is actually deleted, it will +continue to appear in listings. + +If there are many objects, this operation may take some time. Stats will +periodically be emitted so you know the process hasn't hung. These will +also include the last object marked for deletion; if there is a failure, +pass this as the ``--marker`` when retrying to minimize duplicative work. +''' + +import argparse +import io +import itertools +import json +import six +import time + +from swift.common.internal_client import InternalClient +from swift.common.utils import Timestamp, MD5_OF_EMPTY_STRING +from swift.obj.expirer import build_task_obj, ASYNC_DELETE_TYPE + +OBJECTS_PER_UPDATE = 10000 + + +def make_delete_jobs(account, container, objects, timestamp): + ''' + Create a list of async-delete jobs + + :param account: (native or unicode string) account to delete from + :param container: (native or unicode string) container to delete from + :param objects: (list of native or unicode strings) objects to delete + :param timestamp: (Timestamp) time at which objects should be marked + deleted + :returns: list of dicts appropriate for an UPDATE request to an + expiring-object queue + ''' + if six.PY2: + if isinstance(account, str): + account = account.decode('utf8') + if isinstance(container, str): + container = container.decode('utf8') + return [ + { + 'name': build_task_obj( + timestamp, account, container, + obj.decode('utf8') if six.PY2 and isinstance(obj, str) + else obj), + 'deleted': 0, + 'created_at': timestamp.internal, + 'etag': MD5_OF_EMPTY_STRING, + 'size': 0, + 'storage_policy_index': 0, + 'content_type': ASYNC_DELETE_TYPE, + } for obj in objects] + + +def mark_for_deletion(swift, account, container, marker, end_marker, + prefix, timestamp=None, yield_time=10): + ''' + Enqueue jobs to async-delete some portion of a container's namespace + + :param swift: InternalClient to use + :param account: account to delete from + :param container: container to delete from + :param marker: only delete objects after this name + :param end_marker: only delete objects before this name. Use ``None`` or + empty string to delete to the end of the namespace. + :param prefix: only delete objects starting with this prefix + :param timestamp: delete all objects as of this time. If ``None``, the + current time will be used. + :param yield_time: approximate period with which intermediate results + should be returned. If ``None``, disable intermediate + results. + :returns: If ``yield_time`` is ``None``, the number of objects marked for + deletion. Otherwise, a generator that will yield out tuples of + ``(number of marked objects, last object name)`` approximately + every ``yield_time`` seconds. The final tuple will have ``None`` + as the second element. This form allows you to retry when an + error occurs partway through while minimizing duplicate work. + ''' + if timestamp is None: + timestamp = Timestamp.now() + + def enqueue_deletes(): + deleted = 0 + obj_iter = swift.iter_objects( + account, container, + marker=marker, end_marker=end_marker, prefix=prefix) + time_marker = time.time() + while True: + to_delete = [obj['name'] for obj in itertools.islice( + obj_iter, OBJECTS_PER_UPDATE)] + if not to_delete: + break + delete_jobs = make_delete_jobs( + account, container, to_delete, timestamp) + swift.make_request( + 'UPDATE', + swift.make_path('.expiring_objects', str(int(timestamp))), + headers={'X-Backend-Allow-Method': 'UPDATE', + 'X-Backend-Storage-Policy-Index': '0', + 'X-Timestamp': timestamp.internal}, + acceptable_statuses=(2,), + body_file=io.BytesIO(json.dumps(delete_jobs).encode('ascii'))) + deleted += len(delete_jobs) + if yield_time is not None and \ + time.time() - time_marker > yield_time: + yield deleted, to_delete[-1] + time_marker = time.time() + yield deleted, None + + if yield_time is None: + for deleted, marker in enqueue_deletes(): + if marker is None: + return deleted + else: + return enqueue_deletes() + + +def main(): + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument('--config', default='/etc/swift/internal-client.conf', + help=('internal-client config file ' + '(default: /etc/swift/internal-client.conf')) + parser.add_argument('--request-tries', type=int, default=3, + help='(default: 3)') + parser.add_argument('account', help='account from which to delete') + parser.add_argument('container', help='container from which to delete') + parser.add_argument( + '--prefix', default='', + help='only delete objects with this prefix (default: none)') + parser.add_argument( + '--marker', default='', + help='only delete objects after this marker (default: none)') + parser.add_argument( + '--end-marker', default='', + help='only delete objects before this end-marker (default: none)') + parser.add_argument( + '--timestamp', type=Timestamp, default=Timestamp.now(), + help='delete all objects as of this time (default: now)') + args = parser.parse_args() + + swift = InternalClient( + args.config, 'Swift Container Deleter', args.request_tries) + for deleted, marker in mark_for_deletion( + swift, args.account, args.container, + args.marker, args.end_marker, args.prefix, args.timestamp): + if marker is None: + print('Finished. Marked %d objects for deletion.' % deleted) + else: + print('Marked %d objects for deletion, through %r' % ( + deleted, marker)) + + +if __name__ == '__main__': + main() diff --git a/swift/container/server.py b/swift/container/server.py index 1bc21bb71c..66e47ddf36 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -751,6 +751,32 @@ class ContainerController(BaseStorageServer): ret.request = req return ret + @public + @timing_stats() + def UPDATE(self, req): + """ + Handle HTTP UPDATE request (merge_items RPCs coming from the proxy.) + """ + drive, part, account, container = split_and_validate_path(req, 4) + req_timestamp = valid_timestamp(req) + try: + check_drive(self.root, drive, self.mount_check) + except ValueError: + return HTTPInsufficientStorage(drive=drive, request=req) + if not self.check_free_space(drive): + return HTTPInsufficientStorage(drive=drive, request=req) + + requested_policy_index = self.get_and_validate_policy_index(req) + broker = self._get_container_broker(drive, part, account, container) + self._maybe_autocreate(broker, req_timestamp, account, + requested_policy_index) + try: + objs = json.load(req.environ['wsgi.input']) + except ValueError as err: + return HTTPBadRequest(body=str(err), content_type='text/plain') + broker.merge_items(objs) + return HTTPAccepted(request=req) + @public @timing_stats() def POST(self, req): diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index 6820d526d4..28f1142180 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -28,7 +28,7 @@ from eventlet.greenpool import GreenPool from swift.common.daemon import Daemon from swift.common.internal_client import InternalClient, UnexpectedResponse from swift.common.utils import get_logger, dump_recon_cache, split_path, \ - Timestamp, config_true_value + Timestamp, config_true_value, normalize_delete_at_timestamp from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \ HTTP_PRECONDITION_FAILED from swift.common.swob import wsgi_quote, str_to_wsgi @@ -36,6 +36,34 @@ from swift.common.swob import wsgi_quote, str_to_wsgi from swift.container.reconciler import direct_delete_container_entry MAX_OBJECTS_TO_CACHE = 100000 +ASYNC_DELETE_TYPE = 'application/async-deleted' + + +def build_task_obj(timestamp, target_account, target_container, + target_obj): + """ + :return: a task object name in format of + "-//" + """ + timestamp = Timestamp(timestamp) + return '%s-%s/%s/%s' % ( + normalize_delete_at_timestamp(timestamp), + target_account, target_container, target_obj) + + +def parse_task_obj(task_obj): + """ + :param task_obj: a task object name in format of + "-/" + + "/" + :return: 4-tuples of (delete_at_time, target_account, target_container, + target_obj) + """ + timestamp, target_path = task_obj.split('-', 1) + timestamp = Timestamp(timestamp) + target_account, target_container, target_obj = \ + split_path('/' + target_path, 3, 3, True) + return timestamp, target_account, target_container, target_obj class ObjectExpirer(Daemon): @@ -123,18 +151,7 @@ class ObjectExpirer(Daemon): self.report_last_time = time() def parse_task_obj(self, task_obj): - """ - :param task_obj: a task object name in format of - "-/" + - "/" - :return: 4-tuples of (delete_at_time, target_account, target_container, - target_obj) - """ - timestamp, target_path = task_obj.split('-', 1) - timestamp = Timestamp(timestamp) - target_account, target_container, target_obj = \ - split_path('/' + target_path, 3, 3, True) - return timestamp, target_account, target_container, target_obj + return parse_task_obj(task_obj) def round_robin_order(self, task_iter): """ @@ -238,7 +255,7 @@ class ObjectExpirer(Daemon): task_object = o['name'] try: delete_timestamp, target_account, target_container, \ - target_object = self.parse_task_obj(task_object) + target_object = parse_task_obj(task_object) except ValueError: self.logger.exception('Unexcepted error handling task %r' % task_object) @@ -253,12 +270,14 @@ class ObjectExpirer(Daemon): divisor) != my_index: continue + is_async = o.get('content_type') == ASYNC_DELETE_TYPE yield {'task_account': task_account, 'task_container': task_container, 'task_object': task_object, 'target_path': '/'.join([ target_account, target_container, target_object]), - 'delete_timestamp': delete_timestamp} + 'delete_timestamp': delete_timestamp, + 'is_async_delete': is_async} def run_once(self, *args, **kwargs): """ @@ -390,11 +409,13 @@ class ObjectExpirer(Daemon): 'process must be less than processes') def delete_object(self, target_path, delete_timestamp, - task_account, task_container, task_object): + task_account, task_container, task_object, + is_async_delete): start_time = time() try: try: - self.delete_actual_object(target_path, delete_timestamp) + self.delete_actual_object(target_path, delete_timestamp, + is_async_delete) except UnexpectedResponse as err: if err.resp.status_int not in {HTTP_NOT_FOUND, HTTP_PRECONDITION_FAILED}: @@ -431,7 +452,7 @@ class ObjectExpirer(Daemon): direct_delete_container_entry(self.swift.container_ring, task_account, task_container, task_object) - def delete_actual_object(self, actual_obj, timestamp): + def delete_actual_object(self, actual_obj, timestamp, is_async_delete): """ Deletes the end-user object indicated by the actual object name given '//' if and only if the X-Delete-At value @@ -442,13 +463,19 @@ class ObjectExpirer(Daemon): :param timestamp: The swift.common.utils.Timestamp instance the X-Delete-At value must match to perform the actual delete. + :param is_async_delete: False if the object should be deleted because + of "normal" expiration, or True if it should + be async-deleted. :raises UnexpectedResponse: if the delete was unsuccessful and should be retried later """ path = '/v1/' + wsgi_quote(str_to_wsgi(actual_obj.lstrip('/'))) - self.swift.make_request( - 'DELETE', path, - {'X-If-Delete-At': timestamp.normal, - 'X-Timestamp': timestamp.normal, - 'X-Backend-Clean-Expiring-Object-Queue': 'no'}, - (2, HTTP_CONFLICT)) + if is_async_delete: + headers = {'X-Timestamp': timestamp.normal} + acceptable_statuses = (2, HTTP_CONFLICT, HTTP_NOT_FOUND) + else: + headers = {'X-Timestamp': timestamp.normal, + 'X-If-Delete-At': timestamp.normal, + 'X-Backend-Clean-Expiring-Object-Queue': 'no'} + acceptable_statuses = (2, HTTP_CONFLICT) + self.swift.make_request('DELETE', path, headers, acceptable_statuses) diff --git a/swift/obj/server.py b/swift/obj/server.py index 0519bcb53e..ed6d31881c 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -57,6 +57,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ HTTPInsufficientStorage, HTTPForbidden, HTTPException, HTTPConflict, \ HTTPServerError, wsgi_to_bytes, wsgi_to_str from swift.obj.diskfile import RESERVED_DATAFILE_META, DiskFileRouter +from swift.obj.expirer import build_task_obj def iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size): @@ -493,7 +494,7 @@ class ObjectController(BaseStorageServer): for host, contdevice in updates: self.async_update( op, self.expiring_objects_account, delete_at_container, - '%s-%s/%s/%s' % (delete_at, account, container, obj), + build_task_obj(delete_at, account, container, obj), host, partition, contdevice, headers_out, objdevice, policy) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index d7e668c45f..160ed4316f 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -1643,7 +1643,7 @@ class Controller(object): return info def _make_request(self, nodes, part, method, path, headers, query, - logger_thread_locals): + body, logger_thread_locals): """ Iterates over the given node iterator, sending an HTTP request to one node at a time. The first non-informational, non-server-error @@ -1657,12 +1657,18 @@ class Controller(object): (full path ends up being /<$device>/<$part>/<$path>) :param headers: dictionary of headers :param query: query string to send to the backend. + :param body: byte string to use as the request body. + Try to keep it small. :param logger_thread_locals: The thread local values to be set on the self.app.logger to retain transaction logging information. :returns: a swob.Response object, or None if no responses were received """ self.app.logger.thread_locals = logger_thread_locals + if body: + if not isinstance(body, bytes): + raise TypeError('body must be bytes, not %s' % type(body)) + headers['Content-Length'] = str(len(body)) for node in nodes: try: start_node_timing = time.time() @@ -1672,6 +1678,9 @@ class Controller(object): headers=headers, query_string=query) conn.node = node self.app.set_node_timing(node, time.time() - start_node_timing) + if body: + with Timeout(self.app.node_timeout): + conn.send(body) with Timeout(self.app.node_timeout): resp = conn.getresponse() if not is_informational(resp.status) and \ @@ -1698,7 +1707,7 @@ class Controller(object): def make_requests(self, req, ring, part, method, path, headers, query_string='', overrides=None, node_count=None, - node_iterator=None): + node_iterator=None, body=None): """ Sends an HTTP request to multiple nodes and aggregates the results. It attempts the primary nodes concurrently, then iterates over the @@ -1727,7 +1736,7 @@ class Controller(object): for head in headers: pile.spawn(self._make_request, nodes, part, method, path, - head, query_string, self.app.logger.thread_locals) + head, query_string, body, self.app.logger.thread_locals) response = [] statuses = [] for resp in pile: diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index fb29c7fdf0..58f2a569d2 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -356,6 +356,26 @@ class ContainerController(Controller): return HTTPNotFound(request=req) return resp + def UPDATE(self, req): + """HTTP UPDATE request handler. + + Method to perform bulk operations on container DBs, + similar to a merge_items REPLICATE request. + + Not client facing; internal clients or middlewares must include + ``X-Backend-Allow-Method: UPDATE`` header to access. + """ + container_partition, containers = self.app.container_ring.get_nodes( + self.account_name, self.container_name) + # Since this isn't client facing, expect callers to supply an index + policy_index = req.headers['X-Backend-Storage-Policy-Index'] + headers = self._backend_requests( + req, len(containers), account_partition=None, accounts=[], + policy_index=policy_index) + return self.make_requests( + req, self.app.container_ring, container_partition, 'UPDATE', + req.swift_entity_path, headers, body=req.body) + def _backend_requests(self, req, n_outgoing, account_partition, accounts, policy_index=None): additional = {'X-Timestamp': Timestamp.now().internal} diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 2358c41ec2..082a6d2f79 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -507,8 +507,12 @@ class Application(object): controller.trans_id = req.environ['swift.trans_id'] self.logger.client_ip = get_remote_client(req) - if req.method not in controller.allowed_methods: + allowed_methods = set(controller.allowed_methods) + if 'X-Backend-Allow-Method' in req.headers: + allowed_methods.add(req.headers['X-Backend-Allow-Method']) + if req.method not in allowed_methods: return HTTPMethodNotAllowed(request=req, headers={ + # Only advertise the *controller's* allowed_methods 'Allow': ', '.join(controller.allowed_methods)}) handler = getattr(controller, req.method) diff --git a/test/unit/cli/test_container_deleter.py b/test/unit/cli/test_container_deleter.py new file mode 100644 index 0000000000..7c4aaa2bed --- /dev/null +++ b/test/unit/cli/test_container_deleter.py @@ -0,0 +1,277 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import itertools +import json +import mock +import six +import unittest + +from swift.cli import container_deleter +from swift.common import internal_client +from swift.common import swob +from swift.common import utils + +AppCall = collections.namedtuple('AppCall', [ + 'method', 'path', 'query', 'headers', 'body']) + + +class FakeInternalClient(internal_client.InternalClient): + def __init__(self, responses): + self.resp_iter = iter(responses) + self.calls = [] + + def make_request(self, method, path, headers, acceptable_statuses, + body_file=None, params=None): + if body_file is None: + body = None + else: + body = body_file.read() + path, _, query = path.partition('?') + self.calls.append(AppCall(method, path, query, headers, body)) + resp = next(self.resp_iter) + if isinstance(resp, Exception): + raise resp + return resp + + def __enter__(self): + return self + + def __exit__(self, *args): + unused_responses = [r for r in self.resp_iter] + if unused_responses: + raise Exception('Unused responses: %r' % unused_responses) + + +class TestContainerDeleter(unittest.TestCase): + def setUp(self): + patcher = mock.patch.object(container_deleter.time, 'time', + side_effect=itertools.count()) + patcher.__enter__() + self.addCleanup(patcher.__exit__) + + patcher = mock.patch.object(container_deleter, 'OBJECTS_PER_UPDATE', 5) + patcher.__enter__() + self.addCleanup(patcher.__exit__) + + def test_make_delete_jobs(self): + ts = '1558463777.42739' + self.assertEqual( + container_deleter.make_delete_jobs( + 'acct', 'cont', ['obj1', 'obj2'], + utils.Timestamp(ts)), + [{'name': ts.split('.')[0] + '-acct/cont/obj1', + 'deleted': 0, + 'created_at': ts, + 'etag': utils.MD5_OF_EMPTY_STRING, + 'size': 0, + 'storage_policy_index': 0, + 'content_type': 'application/async-deleted'}, + {'name': ts.split('.')[0] + '-acct/cont/obj2', + 'deleted': 0, + 'created_at': ts, + 'etag': utils.MD5_OF_EMPTY_STRING, + 'size': 0, + 'storage_policy_index': 0, + 'content_type': 'application/async-deleted'}]) + + def test_make_delete_jobs_native_utf8(self): + ts = '1558463777.42739' + uacct = acct = u'acct-\U0001f334' + ucont = cont = u'cont-\N{SNOWMAN}' + uobj1 = obj1 = u'obj-\N{GREEK CAPITAL LETTER ALPHA}' + uobj2 = obj2 = u'obj-\N{GREEK CAPITAL LETTER OMEGA}' + if six.PY2: + acct = acct.encode('utf8') + cont = cont.encode('utf8') + obj1 = obj1.encode('utf8') + obj2 = obj2.encode('utf8') + self.assertEqual( + container_deleter.make_delete_jobs( + acct, cont, [obj1, obj2], utils.Timestamp(ts)), + [{'name': u'%s-%s/%s/%s' % (ts.split('.')[0], uacct, ucont, uobj1), + 'deleted': 0, + 'created_at': ts, + 'etag': utils.MD5_OF_EMPTY_STRING, + 'size': 0, + 'storage_policy_index': 0, + 'content_type': 'application/async-deleted'}, + {'name': u'%s-%s/%s/%s' % (ts.split('.')[0], uacct, ucont, uobj2), + 'deleted': 0, + 'created_at': ts, + 'etag': utils.MD5_OF_EMPTY_STRING, + 'size': 0, + 'storage_policy_index': 0, + 'content_type': 'application/async-deleted'}]) + + def test_make_delete_jobs_unicode_utf8(self): + ts = '1558463777.42739' + acct = u'acct-\U0001f334' + cont = u'cont-\N{SNOWMAN}' + obj1 = u'obj-\N{GREEK CAPITAL LETTER ALPHA}' + obj2 = u'obj-\N{GREEK CAPITAL LETTER OMEGA}' + self.assertEqual( + container_deleter.make_delete_jobs( + acct, cont, [obj1, obj2], utils.Timestamp(ts)), + [{'name': u'%s-%s/%s/%s' % (ts.split('.')[0], acct, cont, obj1), + 'deleted': 0, + 'created_at': ts, + 'etag': utils.MD5_OF_EMPTY_STRING, + 'size': 0, + 'storage_policy_index': 0, + 'content_type': 'application/async-deleted'}, + {'name': u'%s-%s/%s/%s' % (ts.split('.')[0], acct, cont, obj2), + 'deleted': 0, + 'created_at': ts, + 'etag': utils.MD5_OF_EMPTY_STRING, + 'size': 0, + 'storage_policy_index': 0, + 'content_type': 'application/async-deleted'}]) + + def test_mark_for_deletion_empty_no_yield(self): + with FakeInternalClient([ + swob.Response(json.dumps([ + ])), + ]) as swift: + self.assertEqual(container_deleter.mark_for_deletion( + swift, + 'account', + 'container', + 'marker', + 'end', + 'prefix', + timestamp=None, + yield_time=None, + ), 0) + self.assertEqual(swift.calls, [ + ('GET', '/v1/account/container', + 'format=json&marker=marker&end_marker=end&prefix=prefix', + {}, None), + ]) + + def test_mark_for_deletion_empty_with_yield(self): + with FakeInternalClient([ + swob.Response(json.dumps([ + ])), + ]) as swift: + self.assertEqual(list(container_deleter.mark_for_deletion( + swift, + 'account', + 'container', + 'marker', + 'end', + 'prefix', + timestamp=None, + yield_time=0.5, + )), [(0, None)]) + self.assertEqual(swift.calls, [ + ('GET', '/v1/account/container', + 'format=json&marker=marker&end_marker=end&prefix=prefix', + {}, None), + ]) + + def test_mark_for_deletion_one_update_no_yield(self): + ts = '1558463777.42739' + with FakeInternalClient([ + swob.Response(json.dumps([ + {'name': 'obj1'}, + {'name': 'obj2'}, + {'name': 'obj3'}, + ])), + swob.Response(json.dumps([ + ])), + swob.Response(status=202), + ]) as swift: + self.assertEqual(container_deleter.mark_for_deletion( + swift, + 'account', + 'container', + '', + '', + '', + timestamp=utils.Timestamp(ts), + yield_time=None, + ), 3) + self.assertEqual(swift.calls, [ + ('GET', '/v1/account/container', + 'format=json&marker=&end_marker=&prefix=', {}, None), + ('GET', '/v1/account/container', + 'format=json&marker=obj3&end_marker=&prefix=', {}, None), + ('UPDATE', '/v1/.expiring_objects/' + ts.split('.')[0], '', { + 'X-Backend-Allow-Method': 'UPDATE', + 'X-Backend-Storage-Policy-Index': '0', + 'X-Timestamp': ts}, mock.ANY), + ]) + self.assertEqual( + json.loads(swift.calls[-1].body), + container_deleter.make_delete_jobs( + 'account', 'container', ['obj1', 'obj2', 'obj3'], + utils.Timestamp(ts) + ) + ) + + def test_mark_for_deletion_two_updates_with_yield(self): + ts = '1558463777.42739' + with FakeInternalClient([ + swob.Response(json.dumps([ + {'name': 'obj1'}, + {'name': 'obj2'}, + {'name': 'obj3'}, + {'name': u'obj4-\N{SNOWMAN}'}, + {'name': 'obj5'}, + {'name': 'obj6'}, + ])), + swob.Response(status=202), + swob.Response(json.dumps([ + ])), + swob.Response(status=202), + ]) as swift: + self.assertEqual(list(container_deleter.mark_for_deletion( + swift, + 'account', + 'container', + '', + '', + '', + timestamp=utils.Timestamp(ts), + yield_time=0, + )), [(5, 'obj5'), (6, 'obj6'), (6, None)]) + self.assertEqual(swift.calls, [ + ('GET', '/v1/account/container', + 'format=json&marker=&end_marker=&prefix=', {}, None), + ('UPDATE', '/v1/.expiring_objects/' + ts.split('.')[0], '', { + 'X-Backend-Allow-Method': 'UPDATE', + 'X-Backend-Storage-Policy-Index': '0', + 'X-Timestamp': ts}, mock.ANY), + ('GET', '/v1/account/container', + 'format=json&marker=obj6&end_marker=&prefix=', {}, None), + ('UPDATE', '/v1/.expiring_objects/' + ts.split('.')[0], '', { + 'X-Backend-Allow-Method': 'UPDATE', + 'X-Backend-Storage-Policy-Index': '0', + 'X-Timestamp': ts}, mock.ANY), + ]) + self.assertEqual( + json.loads(swift.calls[-3].body), + container_deleter.make_delete_jobs( + 'account', 'container', + ['obj1', 'obj2', 'obj3', u'obj4-\N{SNOWMAN}', 'obj5'], + utils.Timestamp(ts) + ) + ) + self.assertEqual( + json.loads(swift.calls[-1].body), + container_deleter.make_delete_jobs( + 'account', 'container', ['obj6'], + utils.Timestamp(ts) + ) + ) diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py index aac76521ec..c45d4bb7c2 100644 --- a/test/unit/container/test_server.py +++ b/test/unit/container/test_server.py @@ -354,10 +354,8 @@ class TestContainerController(unittest.TestCase): req.content_length = 0 resp = server_handler.OPTIONS(req) self.assertEqual(200, resp.status_int) - for verb in 'OPTIONS GET POST PUT DELETE HEAD REPLICATE'.split(): - self.assertTrue( - verb in resp.headers['Allow'].split(', ')) - self.assertEqual(len(resp.headers['Allow'].split(', ')), 7) + self.assertEqual(sorted(resp.headers['Allow'].split(', ')), sorted( + 'OPTIONS GET POST PUT DELETE HEAD REPLICATE UPDATE'.split())) self.assertEqual(resp.headers['Server'], (self.controller.server_type + '/' + swift_version)) @@ -1477,6 +1475,115 @@ class TestContainerController(unittest.TestCase): self.assertEqual(mock_statvfs.mock_calls, [mock.call(os.path.join(self.testdir, 'sda1'))]) + def test_UPDATE(self): + ts_iter = make_timestamp_iter() + req = Request.blank( + '/sda1/p/a/c', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': next(ts_iter).internal}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 201) + + ts_iter = make_timestamp_iter() + req = Request.blank( + '/sda1/p/a/c', + environ={'REQUEST_METHOD': 'UPDATE'}, + headers={'X-Timestamp': next(ts_iter).internal}, + body='[invalid json') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 400) + + ts_iter = make_timestamp_iter() + req = Request.blank( + '/sda1/p/a/c', + environ={'REQUEST_METHOD': 'GET'}, + headers={'X-Timestamp': next(ts_iter).internal}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 204) + + obj_ts = next(ts_iter) + req = Request.blank( + '/sda1/p/a/c', + environ={'REQUEST_METHOD': 'UPDATE'}, + headers={'X-Timestamp': next(ts_iter).internal}, + body=json.dumps([ + {'name': 'some obj', 'deleted': 0, + 'created_at': obj_ts.internal, + 'etag': 'whatever', 'size': 1234, + 'storage_policy_index': POLICIES.default.idx, + 'content_type': 'foo/bar'}, + {'name': 'some tombstone', 'deleted': 1, + 'created_at': next(ts_iter).internal, + 'etag': 'noetag', 'size': 0, + 'storage_policy_index': POLICIES.default.idx, + 'content_type': 'application/deleted'}, + {'name': 'wrong policy', 'deleted': 0, + 'created_at': next(ts_iter).internal, + 'etag': 'whatever', 'size': 6789, + 'storage_policy_index': 1, + 'content_type': 'foo/bar'}, + ])) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 202) + + req = Request.blank( + '/sda1/p/a/c?format=json', + environ={'REQUEST_METHOD': 'GET'}, + headers={'X-Timestamp': next(ts_iter).internal}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 200) + self.assertEqual(json.loads(resp.body), [ + {'name': 'some obj', 'hash': 'whatever', 'bytes': 1234, + 'content_type': 'foo/bar', 'last_modified': obj_ts.isoformat}, + ]) + + def test_UPDATE_autocreate(self): + ts_iter = make_timestamp_iter() + req = Request.blank( + '/sda1/p/.a/c', + environ={'REQUEST_METHOD': 'GET'}, + headers={'X-Timestamp': next(ts_iter).internal}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 404) + + obj_ts = next(ts_iter) + req = Request.blank( + '/sda1/p/.a/c', + environ={'REQUEST_METHOD': 'UPDATE'}, + headers={ + 'X-Timestamp': next(ts_iter).internal, + 'X-Backend-Storage-Policy-Index': str(POLICIES.default.idx)}, + body=json.dumps([ + {'name': 'some obj', 'deleted': 0, + 'created_at': obj_ts.internal, + 'etag': 'whatever', 'size': 1234, + 'storage_policy_index': POLICIES.default.idx, + 'content_type': 'foo/bar'}, + {'name': 'some tombstone', 'deleted': 1, + 'created_at': next(ts_iter).internal, + 'etag': 'noetag', 'size': 0, + 'storage_policy_index': POLICIES.default.idx, + 'content_type': 'application/deleted'}, + {'name': 'wrong policy', 'deleted': 0, + 'created_at': next(ts_iter).internal, + 'etag': 'whatever', 'size': 6789, + 'storage_policy_index': 1, + 'content_type': 'foo/bar'}, + ])) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 202, resp.body) + + req = Request.blank( + '/sda1/p/.a/c?format=json', + environ={'REQUEST_METHOD': 'GET'}, + headers={'X-Timestamp': next(ts_iter).internal}) + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 200) + self.assertEqual(json.loads(resp.body), [ + {'name': 'some obj', 'hash': 'whatever', 'bytes': 1234, + 'content_type': 'foo/bar', 'last_modified': obj_ts.isoformat}, + ]) + def test_DELETE(self): ts_iter = make_timestamp_iter() req = Request.blank( @@ -4591,7 +4698,7 @@ class TestNonLegacyDefaultStoragePolicy(TestContainerController): def _update_object_put_headers(self, req): """ Add policy index headers for containers created with default policy - - which in this TestCase is 1. + - which in this TestCase is 2. """ req.headers['X-Backend-Storage-Policy-Index'] = \ str(POLICIES.default.idx) diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py index 1d2a0fb71c..cfd61ed3c8 100644 --- a/test/unit/obj/test_expirer.py +++ b/test/unit/obj/test_expirer.py @@ -254,7 +254,8 @@ class TestObjectExpirer(TestCase): self.deleted_objects = {} def delete_object(self, target_path, delete_timestamp, - task_account, task_container, task_object): + task_account, task_container, task_object, + is_async_delete): if task_container not in self.deleted_objects: self.deleted_objects[task_container] = set() self.deleted_objects[task_container].add(task_object) @@ -303,9 +304,10 @@ class TestObjectExpirer(TestCase): with mock.patch.object(x, 'delete_actual_object', side_effect=exc) as delete_actual: with mock.patch.object(x, 'pop_queue') as pop_queue: - x.delete_object(actual_obj, ts, account, container, obj) + x.delete_object(actual_obj, ts, account, container, obj, + False) - delete_actual.assert_called_once_with(actual_obj, ts) + delete_actual.assert_called_once_with(actual_obj, ts, False) log_lines = x.logger.get_lines_for_level('error') if should_pop: pop_queue.assert_called_once_with(account, container, obj) @@ -377,13 +379,14 @@ class TestObjectExpirer(TestCase): assert_parse_task_obj('1000-a/c/o', 1000, 'a', 'c', 'o') assert_parse_task_obj('0000-acc/con/obj', 0, 'acc', 'con', 'obj') - def make_task(self, delete_at, target): + def make_task(self, delete_at, target, is_async_delete=False): return { 'task_account': '.expiring_objects', 'task_container': delete_at, 'task_object': delete_at + '-' + target, 'delete_timestamp': Timestamp(delete_at), 'target_path': target, + 'is_async_delete': is_async_delete, } def test_round_robin_order(self): @@ -620,7 +623,7 @@ class TestObjectExpirer(TestCase): # executed tasks are with past time self.assertEqual( mock_method.call_args_list, - [mock.call(target_path, self.past_time) + [mock.call(target_path, self.past_time, False) for target_path in self.expired_target_path_list]) def test_failed_delete_keeps_entry(self): @@ -638,7 +641,7 @@ class TestObjectExpirer(TestCase): # all tasks are done with mock.patch.object(self.expirer, 'delete_actual_object', - lambda o, t: None), \ + lambda o, t, b: None), \ mock.patch.object(self.expirer, 'pop_queue') as mock_method: self.expirer.run_once() @@ -653,7 +656,7 @@ class TestObjectExpirer(TestCase): self.assertEqual(self.expirer.report_objects, 0) with mock.patch('swift.obj.expirer.MAX_OBJECTS_TO_CACHE', 0), \ mock.patch.object(self.expirer, 'delete_actual_object', - lambda o, t: None), \ + lambda o, t, b: None), \ mock.patch.object(self.expirer, 'pop_queue', lambda a, c, o: None): self.expirer.run_once() @@ -662,7 +665,8 @@ class TestObjectExpirer(TestCase): def test_delete_actual_object_gets_native_string(self): got_str = [False] - def delete_actual_object_test_for_string(actual_obj, timestamp): + def delete_actual_object_test_for_string(actual_obj, timestamp, + is_async_delete): if isinstance(actual_obj, str): got_str[0] = True @@ -681,7 +685,7 @@ class TestObjectExpirer(TestCase): def fail_delete_container(*a, **kw): raise Exception('failed to delete container') - def fail_delete_actual_object(actual_obj, timestamp): + def fail_delete_actual_object(actual_obj, timestamp, is_async_delete): raise Exception('failed to delete actual object') with mock.patch.object(self.fake_swift, 'delete_container', @@ -761,10 +765,30 @@ class TestObjectExpirer(TestCase): x = expirer.ObjectExpirer({}) ts = Timestamp('1234') - x.delete_actual_object('/path/to/object', ts) + x.delete_actual_object('/path/to/object', ts, False) self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'], got_env[0]['HTTP_X_IF_DELETE_AT']) + self.assertEqual( + got_env[0]['HTTP_X_BACKEND_CLEAN_EXPIRING_OBJECT_QUEUE'], 'no') + + def test_delete_actual_object_bulk(self): + got_env = [None] + + def fake_app(env, start_response): + got_env[0] = env + start_response('204 No Content', [('Content-Length', '0')]) + return [] + + internal_client.loadapp = lambda *a, **kw: fake_app + + x = expirer.ObjectExpirer({}) + ts = Timestamp('1234') + x.delete_actual_object('/path/to/object', ts, True) + self.assertNotIn('HTTP_X_IF_DELETE_AT', got_env[0]) + self.assertNotIn('HTTP_X_BACKEND_CLEAN_EXPIRING_OBJECT_QUEUE', + got_env[0]) + self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'], ts.internal) def test_delete_actual_object_nourlquoting(self): # delete_actual_object should not do its own url quoting because @@ -780,12 +804,41 @@ class TestObjectExpirer(TestCase): x = expirer.ObjectExpirer({}) ts = Timestamp('1234') - x.delete_actual_object('/path/to/object name', ts) + x.delete_actual_object('/path/to/object name', ts, False) self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'], got_env[0]['HTTP_X_IF_DELETE_AT']) self.assertEqual(got_env[0]['PATH_INFO'], '/v1/path/to/object name') + def test_delete_actual_object_async_returns_expected_error(self): + def do_test(test_status, should_raise): + calls = [0] + + def fake_app(env, start_response): + calls[0] += 1 + calls.append(env['PATH_INFO']) + start_response(test_status, [('Content-Length', '0')]) + return [] + + internal_client.loadapp = lambda *a, **kw: fake_app + + x = expirer.ObjectExpirer({}) + ts = Timestamp('1234') + if should_raise: + with self.assertRaises(internal_client.UnexpectedResponse): + x.delete_actual_object('/path/to/object', ts, True) + else: + x.delete_actual_object('/path/to/object', ts, True) + self.assertEqual(calls[0], 1, calls) + + # object was deleted and tombstone reaped + do_test('404 Not Found', False) + # object was overwritten *after* the original delete, or + # object was deleted but tombstone still exists, or ... + do_test('409 Conflict', False) + # Anything else, raise + do_test('400 Bad Request', True) + def test_delete_actual_object_returns_expected_error(self): def do_test(test_status, should_raise): calls = [0] @@ -801,9 +854,9 @@ class TestObjectExpirer(TestCase): ts = Timestamp('1234') if should_raise: with self.assertRaises(internal_client.UnexpectedResponse): - x.delete_actual_object('/path/to/object', ts) + x.delete_actual_object('/path/to/object', ts, False) else: - x.delete_actual_object('/path/to/object', ts) + x.delete_actual_object('/path/to/object', ts, False) self.assertEqual(calls[0], 1) # object was deleted and tombstone reaped @@ -828,7 +881,7 @@ class TestObjectExpirer(TestCase): x = expirer.ObjectExpirer({}) exc = None try: - x.delete_actual_object('/path/to/object', Timestamp('1234')) + x.delete_actual_object('/path/to/object', Timestamp('1234'), False) except Exception as err: exc = err finally: @@ -841,7 +894,7 @@ class TestObjectExpirer(TestCase): x = expirer.ObjectExpirer({}) x.swift.make_request = mock.Mock() x.swift.make_request.return_value.status_int = 204 - x.delete_actual_object(name, timestamp) + x.delete_actual_object(name, timestamp, False) self.assertEqual(x.swift.make_request.call_count, 1) self.assertEqual(x.swift.make_request.call_args[0][1], '/v1/' + urllib.parse.quote(name)) @@ -851,7 +904,7 @@ class TestObjectExpirer(TestCase): timestamp = Timestamp('1515544858.80602') x = expirer.ObjectExpirer({}) x.swift.make_request = mock.MagicMock() - x.delete_actual_object(name, timestamp) + x.delete_actual_object(name, timestamp, False) self.assertEqual(x.swift.make_request.call_count, 1) header = 'X-Backend-Clean-Expiring-Object-Queue' self.assertEqual( diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index f6d5b6c572..6a4cf86c18 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -321,7 +321,7 @@ class TestController(unittest.TestCase): self.controller.account_info(self.account, self.request) set_http_connect(201, raise_timeout_exc=True) self.controller._make_request( - nodes, partition, 'POST', '/', '', '', + nodes, partition, 'POST', '/', '', '', None, self.controller.app.logger.thread_locals) # tests if 200 is cached and used @@ -668,6 +668,27 @@ class TestProxyServer(unittest.TestCase): Request.blank('/v1/a', environ={'REQUEST_METHOD': '!invalid'})) self.assertEqual(resp.status, '405 Method Not Allowed') + def test_private_method_request(self): + baseapp = proxy_server.Application({}, + FakeMemcache(), + container_ring=FakeRing(), + account_ring=FakeRing()) + baseapp.logger = debug_logger() + resp = baseapp.handle_request( + Request.blank('/v1/a/c', environ={'REQUEST_METHOD': 'UPDATE'})) + self.assertEqual(resp.status, '405 Method Not Allowed') + # Note that UPDATE definitely *isn't* advertised + self.assertEqual(sorted(resp.headers['Allow'].split(', ')), [ + 'DELETE', 'GET', 'HEAD', 'OPTIONS', 'POST', 'PUT']) + + # But with appropriate (internal-only) overrides, you can still use it + resp = baseapp.handle_request( + Request.blank('/v1/a/c', environ={'REQUEST_METHOD': 'UPDATE'}, + headers={'X-Backend-Allow-Method': 'UPDATE', + 'X-Backend-Storage-Policy-Index': '0'})) + # Now we actually make the requests, but there aren't any nodes + self.assertEqual(resp.status, '503 Service Unavailable') + def test_calls_authorize_allow(self): called = [False]