diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index 2e9ae09e18..4d5d606de6 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -19,6 +19,7 @@ from random import random from time import time from os.path import join from swift import gettext_ as _ +from collections import defaultdict, deque import hashlib from eventlet import sleep, Timeout @@ -26,7 +27,8 @@ from eventlet.greenpool import GreenPool from swift.common.daemon import Daemon from swift.common.internal_client import InternalClient, UnexpectedResponse -from swift.common.utils import get_logger, dump_recon_cache, split_path +from swift.common.utils import get_logger, dump_recon_cache, split_path, \ + Timestamp from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \ HTTP_PRECONDITION_FAILED @@ -93,65 +95,84 @@ class ObjectExpirer(Daemon): 'time': elapsed, 'objects': self.report_objects}) self.report_last_time = time() - def iter_cont_objs_to_expire(self): + def round_robin_order(self, task_iter): """ - Yields (container, obj) tuples to be deleted + Change order of expiration tasks to avoid deleting objects in a + certain container continuously. + + :param task_iter: An iterator of delete-task dicts, which should each + have a ``target_path`` key. """ - obj_cache = {} + obj_cache = defaultdict(deque) cnt = 0 - all_containers = set() + def dump_obj_cache_in_round_robin(): + while obj_cache: + for key in sorted(obj_cache): + if obj_cache[key]: + yield obj_cache[key].popleft() + else: + del obj_cache[key] + for delete_task in task_iter: + try: + target_account, target_container, _junk = \ + split_path('/' + delete_task['target_path'], 3, 3, True) + cache_key = '%s/%s' % (target_account, target_container) + except ValueError: + cache_key = None + + obj_cache[cache_key].append(delete_task) + cnt += 1 + + if cnt > MAX_OBJECTS_TO_CACHE: + for task in dump_obj_cache_in_round_robin(): + yield task + cnt = 0 + + for task in dump_obj_cache_in_round_robin(): + yield task + + def iter_task_containers_to_expire(self): + """ + Yields container name under the expiring_objects_account if + the container name (i.e. timestamp) is past. + """ for c in self.swift.iter_containers(self.expiring_objects_account): - container = str(c['name']) - timestamp = int(container) - if timestamp > int(time()): + task_container = str(c['name']) + timestamp = Timestamp(task_container) + if timestamp > Timestamp.now(): break - all_containers.add(container) + yield task_container + + def iter_task_to_expire(self, task_containers): + """ + Yields task expire info dict which consists of task_container, + target_path, timestamp_to_delete, and target_path + """ + + for task_container in task_containers: for o in self.swift.iter_objects(self.expiring_objects_account, - container): - obj = o['name'].encode('utf8') - timestamp, actual_obj = obj.split('-', 1) - timestamp = int(timestamp) - if timestamp > int(time()): + task_container): + task_object = o['name'].encode('utf8') + delete_timestamp, target_path = task_object.split('-', 1) + delete_timestamp = Timestamp(delete_timestamp) + if delete_timestamp > Timestamp.now(): + # we shouldn't yield the object that doesn't reach + # the expiration date yet. break - try: - cust_account, cust_cont, cust_obj = \ - split_path('/' + actual_obj, 3, 3, True) - cache_key = '%s/%s' % (cust_account, cust_cont) - except ValueError: - cache_key = None if self.processes > 0: obj_process = int( - hashlib.md5('%s/%s' % (container, obj)). + hashlib.md5('%s/%s' % (task_container, task_object)). hexdigest(), 16) if obj_process % self.processes != self.process: continue - if cache_key not in obj_cache: - obj_cache[cache_key] = [] - obj_cache[cache_key].append((container, obj)) - cnt += 1 - - if cnt > MAX_OBJECTS_TO_CACHE: - while obj_cache: - for key in obj_cache.keys(): - if obj_cache[key]: - yield obj_cache[key].pop() - cnt -= 1 - else: - del obj_cache[key] - - while obj_cache: - for key in obj_cache.keys(): - if obj_cache[key]: - yield obj_cache[key].pop() - else: - del obj_cache[key] - - for container in all_containers: - yield (container, None) + yield {'task_container': task_container, + 'task_object': task_object, + 'target_path': target_path, + 'delete_timestamp': delete_timestamp} def run_once(self, *args, **kwargs): """ @@ -166,7 +187,6 @@ class ObjectExpirer(Daemon): """ self.get_process_values(kwargs) pool = GreenPool(self.concurrency) - containers_to_delete = set([]) self.report_first_time = self.report_last_time = time() self.report_objects = 0 try: @@ -178,22 +198,19 @@ class ObjectExpirer(Daemon): '%(objects)s possible objects') % { 'containers': containers, 'objects': objects}) - for container, obj in self.iter_cont_objs_to_expire(): - containers_to_delete.add(container) + task_containers = list(self.iter_task_containers_to_expire()) - if not obj: - continue + # delete_task_iter is a generator to yield a dict of + # task_container, task_object, delete_timestamp, target_path + # to handle delete actual object and pop the task from the queue. + delete_task_iter = self.round_robin_order( + self.iter_task_to_expire(task_containers)) - timestamp, actual_obj = obj.split('-', 1) - timestamp = int(timestamp) - if timestamp > int(time()): - break - pool.spawn_n( - self.delete_object, actual_obj, timestamp, - container, obj) + for delete_task in delete_task_iter: + pool.spawn_n(self.delete_object, **delete_task) pool.waitall() - for container in containers_to_delete: + for container in task_containers: try: self.swift.delete_container( self.expiring_objects_account, @@ -257,33 +274,35 @@ class ObjectExpirer(Daemon): raise ValueError( 'process must be less than processes') - def delete_object(self, actual_obj, timestamp, container, obj): + def delete_object(self, target_path, delete_timestamp, + task_container, task_object): start_time = time() try: try: - self.delete_actual_object(actual_obj, timestamp) + self.delete_actual_object(target_path, delete_timestamp) except UnexpectedResponse as err: if err.resp.status_int not in {HTTP_NOT_FOUND, HTTP_PRECONDITION_FAILED}: raise - if float(timestamp) > time() - self.reclaim_age: + if float(delete_timestamp) > time() - self.reclaim_age: # we'll have to retry the DELETE later raise - self.pop_queue(container, obj) + self.pop_queue(task_container, task_object) self.report_objects += 1 self.logger.increment('objects') except UnexpectedResponse as err: self.logger.increment('errors') self.logger.error( 'Unexpected response while deleting object %(container)s ' - '%(obj)s: %(err)s' % {'container': container, 'obj': obj, - 'err': str(err.resp.status_int)}) + '%(obj)s: %(err)s' % { + 'container': task_container, 'obj': task_object, + 'err': str(err.resp.status_int)}) except (Exception, Timeout) as err: self.logger.increment('errors') self.logger.exception( 'Exception while deleting object %(container)s %(obj)s' - ' %(err)s' % {'container': container, - 'obj': obj, 'err': str(err)}) + ' %(err)s' % {'container': task_container, + 'obj': task_object, 'err': str(err)}) self.logger.timing_since('timing', start_time) self.report() @@ -304,15 +323,16 @@ class ObjectExpirer(Daemon): :param actual_obj: The name of the end-user object to delete: '//' - :param timestamp: The timestamp the X-Delete-At value must match to - perform the actual delete. + :param timestamp: The swift.common.utils.Timestamp instance the + X-Delete-At value must match to perform the actual + delete. :raises UnexpectedResponse: if the delete was unsuccessful and should be retried later """ path = '/v1/' + urllib.parse.quote(actual_obj.lstrip('/')) self.swift.make_request( 'DELETE', path, - {'X-If-Delete-At': str(timestamp), - 'X-Timestamp': str(timestamp), + {'X-If-Delete-At': timestamp.normal, + 'X-Timestamp': timestamp.normal, 'X-Backend-Clean-Expiring-Object-Queue': 'no'}, (2, HTTP_CONFLICT)) diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py index 84c833eb29..97f6c31524 100644 --- a/test/unit/obj/test_expirer.py +++ b/test/unit/obj/test_expirer.py @@ -26,6 +26,7 @@ import six from six.moves import urllib from swift.common import internal_client, utils, swob +from swift.common.utils import Timestamp from swift.obj import expirer @@ -215,11 +216,12 @@ class TestObjectExpirer(TestCase): self.deleted_objects = {} self.obj_containers_in_order = [] - def delete_object(self, actual_obj, timestamp, container, obj): - if container not in self.deleted_objects: - self.deleted_objects[container] = set() - self.deleted_objects[container].add(obj) - self.obj_containers_in_order.append(container) + def delete_object(self, target_path, delete_timestamp, + task_container, task_object): + if task_container not in self.deleted_objects: + self.deleted_objects[task_container] = set() + self.deleted_objects[task_container].add(task_object) + self.obj_containers_in_order.append(task_container) aco_dict = { '.expiring_objects': { @@ -321,6 +323,45 @@ class TestObjectExpirer(TestCase): self.assertTrue( 'so far' in str(x.logger.get_lines_for_level('info'))) + def test_round_robin_order(self): + x = expirer.ObjectExpirer(self.conf, logger=self.logger) + task_con_obj_list = [ + # objects in 0000 timestamp container + {'task_container': '0000', 'task_object': '0000-a/c0/o0', + 'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o0'}, + {'task_container': '0000', 'task_object': '0000-a/c0/o1', + 'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o1'}, + # objects in 0001 timestamp container + {'task_container': '0001', 'task_object': '0001-a/c1/o0', + 'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o0'}, + {'task_container': '0001', 'task_object': '0001-a/c1/o1', + 'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o1'}, + # objects in 0002 timestamp container + {'task_container': '0002', 'task_object': '0002-a/c2/o0', + 'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o0'}, + {'task_container': '0002', 'task_object': '0002-a/c2/o1', + 'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o1'}, + ] + result = list(x.round_robin_order(task_con_obj_list)) + + # sorted by poping one object to delete for each target_container + expected = [ + # objects in 0000 timestamp container + {'task_container': '0000', 'task_object': '0000-a/c0/o0', + 'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o0'}, + {'task_container': '0001', 'task_object': '0001-a/c1/o0', + 'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o0'}, + {'task_container': '0002', 'task_object': '0002-a/c2/o0', + 'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o0'}, + {'task_container': '0000', 'task_object': '0000-a/c0/o1', + 'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o1'}, + {'task_container': '0001', 'task_object': '0001-a/c1/o1', + 'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o1'}, + {'task_container': '0002', 'task_object': '0002-a/c2/o1', + 'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o1'}, + ] + self.assertEqual(expected, result) + def test_run_once_nothing_to_do(self): x = expirer.ObjectExpirer(self.conf, logger=self.logger) x.swift = 'throw error because a string does not have needed methods' @@ -621,7 +662,7 @@ class TestObjectExpirer(TestCase): internal_client.loadapp = lambda *a, **kw: fake_app x = expirer.ObjectExpirer({}) - ts = '1234' + ts = Timestamp('1234') x.delete_actual_object('/path/to/object', ts) self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'], @@ -640,7 +681,7 @@ class TestObjectExpirer(TestCase): internal_client.loadapp = lambda *a, **kw: fake_app x = expirer.ObjectExpirer({}) - ts = '1234' + ts = Timestamp('1234') x.delete_actual_object('/path/to/object name', ts) self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'], @@ -659,11 +700,12 @@ class TestObjectExpirer(TestCase): internal_client.loadapp = lambda *a, **kw: fake_app x = expirer.ObjectExpirer({}) + ts = Timestamp('1234') if should_raise: with self.assertRaises(internal_client.UnexpectedResponse): - x.delete_actual_object('/path/to/object', '1234') + x.delete_actual_object('/path/to/object', ts) else: - x.delete_actual_object('/path/to/object', '1234') + x.delete_actual_object('/path/to/object', ts) self.assertEqual(calls[0], 1) # object was deleted and tombstone reaped @@ -688,7 +730,7 @@ class TestObjectExpirer(TestCase): x = expirer.ObjectExpirer({}) exc = None try: - x.delete_actual_object('/path/to/object', '1234') + x.delete_actual_object('/path/to/object', Timestamp('1234')) except Exception as err: exc = err finally: @@ -697,7 +739,7 @@ class TestObjectExpirer(TestCase): def test_delete_actual_object_quotes(self): name = 'this name should get quoted' - timestamp = '1366063156.863045' + timestamp = Timestamp('1366063156.863045') x = expirer.ObjectExpirer({}) x.swift.make_request = mock.Mock() x.swift.make_request.return_value.status_int = 204 @@ -708,7 +750,7 @@ class TestObjectExpirer(TestCase): def test_delete_actual_object_queue_cleaning(self): name = 'something' - timestamp = '1515544858.80602' + timestamp = Timestamp('1515544858.80602') x = expirer.ObjectExpirer({}) x.swift.make_request = mock.MagicMock() x.delete_actual_object(name, timestamp)