From 303635348b6cd30b13522d5c483934882178982b Mon Sep 17 00:00:00 2001 From: Kazuhiro MIYAHARA Date: Fri, 26 Jan 2018 07:54:31 +0000 Subject: [PATCH] Refactor expirer's task round robin implementation Object-expirer changes order of expiration tasks to avoid deleting objects in a certain container continuously. To make review for expirer's task queue update patch [1] easy, this patch refactors the implementation of the order change. In this patch, the order change is divided as a function. In [1], there will be two implementations for legacy task queue and for general task queue. The two implementations have similar codes. This patch helps to avoid copying codes in the two implementations. Other than dividing function, this patch tries to resolve: - Separate container iteration and object iteration to avoid the generator termination with (container, None) tuple. - Using Timestamp class for delete_timestamp to be consist with other modules - Change yielded delete task object info from tuple to dict because that includes several complex info (e.g. task_container, task_object, and target_path) - Fix minor docs and tests depends on the changes above [1]: https://review.openstack.org/#/c/517389 Co-Authored-By: Kota Tsuyuzaki Change-Id: Ibf61eb1f767a48cb457dd494e1f7c12acfd205de --- swift/obj/expirer.py | 158 +++++++++++++++++++--------------- test/unit/obj/test_expirer.py | 66 +++++++++++--- 2 files changed, 143 insertions(+), 81 deletions(-) diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index 2e9ae09e18..4d5d606de6 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -19,6 +19,7 @@ from random import random from time import time from os.path import join from swift import gettext_ as _ +from collections import defaultdict, deque import hashlib from eventlet import sleep, Timeout @@ -26,7 +27,8 @@ from eventlet.greenpool import GreenPool from swift.common.daemon import Daemon from swift.common.internal_client import InternalClient, UnexpectedResponse -from swift.common.utils import get_logger, dump_recon_cache, split_path +from swift.common.utils import get_logger, dump_recon_cache, split_path, \ + Timestamp from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \ HTTP_PRECONDITION_FAILED @@ -93,65 +95,84 @@ class ObjectExpirer(Daemon): 'time': elapsed, 'objects': self.report_objects}) self.report_last_time = time() - def iter_cont_objs_to_expire(self): + def round_robin_order(self, task_iter): """ - Yields (container, obj) tuples to be deleted + Change order of expiration tasks to avoid deleting objects in a + certain container continuously. + + :param task_iter: An iterator of delete-task dicts, which should each + have a ``target_path`` key. """ - obj_cache = {} + obj_cache = defaultdict(deque) cnt = 0 - all_containers = set() + def dump_obj_cache_in_round_robin(): + while obj_cache: + for key in sorted(obj_cache): + if obj_cache[key]: + yield obj_cache[key].popleft() + else: + del obj_cache[key] + for delete_task in task_iter: + try: + target_account, target_container, _junk = \ + split_path('/' + delete_task['target_path'], 3, 3, True) + cache_key = '%s/%s' % (target_account, target_container) + except ValueError: + cache_key = None + + obj_cache[cache_key].append(delete_task) + cnt += 1 + + if cnt > MAX_OBJECTS_TO_CACHE: + for task in dump_obj_cache_in_round_robin(): + yield task + cnt = 0 + + for task in dump_obj_cache_in_round_robin(): + yield task + + def iter_task_containers_to_expire(self): + """ + Yields container name under the expiring_objects_account if + the container name (i.e. timestamp) is past. + """ for c in self.swift.iter_containers(self.expiring_objects_account): - container = str(c['name']) - timestamp = int(container) - if timestamp > int(time()): + task_container = str(c['name']) + timestamp = Timestamp(task_container) + if timestamp > Timestamp.now(): break - all_containers.add(container) + yield task_container + + def iter_task_to_expire(self, task_containers): + """ + Yields task expire info dict which consists of task_container, + target_path, timestamp_to_delete, and target_path + """ + + for task_container in task_containers: for o in self.swift.iter_objects(self.expiring_objects_account, - container): - obj = o['name'].encode('utf8') - timestamp, actual_obj = obj.split('-', 1) - timestamp = int(timestamp) - if timestamp > int(time()): + task_container): + task_object = o['name'].encode('utf8') + delete_timestamp, target_path = task_object.split('-', 1) + delete_timestamp = Timestamp(delete_timestamp) + if delete_timestamp > Timestamp.now(): + # we shouldn't yield the object that doesn't reach + # the expiration date yet. break - try: - cust_account, cust_cont, cust_obj = \ - split_path('/' + actual_obj, 3, 3, True) - cache_key = '%s/%s' % (cust_account, cust_cont) - except ValueError: - cache_key = None if self.processes > 0: obj_process = int( - hashlib.md5('%s/%s' % (container, obj)). + hashlib.md5('%s/%s' % (task_container, task_object)). hexdigest(), 16) if obj_process % self.processes != self.process: continue - if cache_key not in obj_cache: - obj_cache[cache_key] = [] - obj_cache[cache_key].append((container, obj)) - cnt += 1 - - if cnt > MAX_OBJECTS_TO_CACHE: - while obj_cache: - for key in obj_cache.keys(): - if obj_cache[key]: - yield obj_cache[key].pop() - cnt -= 1 - else: - del obj_cache[key] - - while obj_cache: - for key in obj_cache.keys(): - if obj_cache[key]: - yield obj_cache[key].pop() - else: - del obj_cache[key] - - for container in all_containers: - yield (container, None) + yield {'task_container': task_container, + 'task_object': task_object, + 'target_path': target_path, + 'delete_timestamp': delete_timestamp} def run_once(self, *args, **kwargs): """ @@ -166,7 +187,6 @@ class ObjectExpirer(Daemon): """ self.get_process_values(kwargs) pool = GreenPool(self.concurrency) - containers_to_delete = set([]) self.report_first_time = self.report_last_time = time() self.report_objects = 0 try: @@ -178,22 +198,19 @@ class ObjectExpirer(Daemon): '%(objects)s possible objects') % { 'containers': containers, 'objects': objects}) - for container, obj in self.iter_cont_objs_to_expire(): - containers_to_delete.add(container) + task_containers = list(self.iter_task_containers_to_expire()) - if not obj: - continue + # delete_task_iter is a generator to yield a dict of + # task_container, task_object, delete_timestamp, target_path + # to handle delete actual object and pop the task from the queue. + delete_task_iter = self.round_robin_order( + self.iter_task_to_expire(task_containers)) - timestamp, actual_obj = obj.split('-', 1) - timestamp = int(timestamp) - if timestamp > int(time()): - break - pool.spawn_n( - self.delete_object, actual_obj, timestamp, - container, obj) + for delete_task in delete_task_iter: + pool.spawn_n(self.delete_object, **delete_task) pool.waitall() - for container in containers_to_delete: + for container in task_containers: try: self.swift.delete_container( self.expiring_objects_account, @@ -257,33 +274,35 @@ class ObjectExpirer(Daemon): raise ValueError( 'process must be less than processes') - def delete_object(self, actual_obj, timestamp, container, obj): + def delete_object(self, target_path, delete_timestamp, + task_container, task_object): start_time = time() try: try: - self.delete_actual_object(actual_obj, timestamp) + self.delete_actual_object(target_path, delete_timestamp) except UnexpectedResponse as err: if err.resp.status_int not in {HTTP_NOT_FOUND, HTTP_PRECONDITION_FAILED}: raise - if float(timestamp) > time() - self.reclaim_age: + if float(delete_timestamp) > time() - self.reclaim_age: # we'll have to retry the DELETE later raise - self.pop_queue(container, obj) + self.pop_queue(task_container, task_object) self.report_objects += 1 self.logger.increment('objects') except UnexpectedResponse as err: self.logger.increment('errors') self.logger.error( 'Unexpected response while deleting object %(container)s ' - '%(obj)s: %(err)s' % {'container': container, 'obj': obj, - 'err': str(err.resp.status_int)}) + '%(obj)s: %(err)s' % { + 'container': task_container, 'obj': task_object, + 'err': str(err.resp.status_int)}) except (Exception, Timeout) as err: self.logger.increment('errors') self.logger.exception( 'Exception while deleting object %(container)s %(obj)s' - ' %(err)s' % {'container': container, - 'obj': obj, 'err': str(err)}) + ' %(err)s' % {'container': task_container, + 'obj': task_object, 'err': str(err)}) self.logger.timing_since('timing', start_time) self.report() @@ -304,15 +323,16 @@ class ObjectExpirer(Daemon): :param actual_obj: The name of the end-user object to delete: '//' - :param timestamp: The timestamp the X-Delete-At value must match to - perform the actual delete. + :param timestamp: The swift.common.utils.Timestamp instance the + X-Delete-At value must match to perform the actual + delete. :raises UnexpectedResponse: if the delete was unsuccessful and should be retried later """ path = '/v1/' + urllib.parse.quote(actual_obj.lstrip('/')) self.swift.make_request( 'DELETE', path, - {'X-If-Delete-At': str(timestamp), - 'X-Timestamp': str(timestamp), + {'X-If-Delete-At': timestamp.normal, + 'X-Timestamp': timestamp.normal, 'X-Backend-Clean-Expiring-Object-Queue': 'no'}, (2, HTTP_CONFLICT)) diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py index 84c833eb29..97f6c31524 100644 --- a/test/unit/obj/test_expirer.py +++ b/test/unit/obj/test_expirer.py @@ -26,6 +26,7 @@ import six from six.moves import urllib from swift.common import internal_client, utils, swob +from swift.common.utils import Timestamp from swift.obj import expirer @@ -215,11 +216,12 @@ class TestObjectExpirer(TestCase): self.deleted_objects = {} self.obj_containers_in_order = [] - def delete_object(self, actual_obj, timestamp, container, obj): - if container not in self.deleted_objects: - self.deleted_objects[container] = set() - self.deleted_objects[container].add(obj) - self.obj_containers_in_order.append(container) + def delete_object(self, target_path, delete_timestamp, + task_container, task_object): + if task_container not in self.deleted_objects: + self.deleted_objects[task_container] = set() + self.deleted_objects[task_container].add(task_object) + self.obj_containers_in_order.append(task_container) aco_dict = { '.expiring_objects': { @@ -321,6 +323,45 @@ class TestObjectExpirer(TestCase): self.assertTrue( 'so far' in str(x.logger.get_lines_for_level('info'))) + def test_round_robin_order(self): + x = expirer.ObjectExpirer(self.conf, logger=self.logger) + task_con_obj_list = [ + # objects in 0000 timestamp container + {'task_container': '0000', 'task_object': '0000-a/c0/o0', + 'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o0'}, + {'task_container': '0000', 'task_object': '0000-a/c0/o1', + 'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o1'}, + # objects in 0001 timestamp container + {'task_container': '0001', 'task_object': '0001-a/c1/o0', + 'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o0'}, + {'task_container': '0001', 'task_object': '0001-a/c1/o1', + 'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o1'}, + # objects in 0002 timestamp container + {'task_container': '0002', 'task_object': '0002-a/c2/o0', + 'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o0'}, + {'task_container': '0002', 'task_object': '0002-a/c2/o1', + 'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o1'}, + ] + result = list(x.round_robin_order(task_con_obj_list)) + + # sorted by poping one object to delete for each target_container + expected = [ + # objects in 0000 timestamp container + {'task_container': '0000', 'task_object': '0000-a/c0/o0', + 'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o0'}, + {'task_container': '0001', 'task_object': '0001-a/c1/o0', + 'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o0'}, + {'task_container': '0002', 'task_object': '0002-a/c2/o0', + 'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o0'}, + {'task_container': '0000', 'task_object': '0000-a/c0/o1', + 'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o1'}, + {'task_container': '0001', 'task_object': '0001-a/c1/o1', + 'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o1'}, + {'task_container': '0002', 'task_object': '0002-a/c2/o1', + 'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o1'}, + ] + self.assertEqual(expected, result) + def test_run_once_nothing_to_do(self): x = expirer.ObjectExpirer(self.conf, logger=self.logger) x.swift = 'throw error because a string does not have needed methods' @@ -621,7 +662,7 @@ class TestObjectExpirer(TestCase): internal_client.loadapp = lambda *a, **kw: fake_app x = expirer.ObjectExpirer({}) - ts = '1234' + ts = Timestamp('1234') x.delete_actual_object('/path/to/object', ts) self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'], @@ -640,7 +681,7 @@ class TestObjectExpirer(TestCase): internal_client.loadapp = lambda *a, **kw: fake_app x = expirer.ObjectExpirer({}) - ts = '1234' + ts = Timestamp('1234') x.delete_actual_object('/path/to/object name', ts) self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'], @@ -659,11 +700,12 @@ class TestObjectExpirer(TestCase): internal_client.loadapp = lambda *a, **kw: fake_app x = expirer.ObjectExpirer({}) + ts = Timestamp('1234') if should_raise: with self.assertRaises(internal_client.UnexpectedResponse): - x.delete_actual_object('/path/to/object', '1234') + x.delete_actual_object('/path/to/object', ts) else: - x.delete_actual_object('/path/to/object', '1234') + x.delete_actual_object('/path/to/object', ts) self.assertEqual(calls[0], 1) # object was deleted and tombstone reaped @@ -688,7 +730,7 @@ class TestObjectExpirer(TestCase): x = expirer.ObjectExpirer({}) exc = None try: - x.delete_actual_object('/path/to/object', '1234') + x.delete_actual_object('/path/to/object', Timestamp('1234')) except Exception as err: exc = err finally: @@ -697,7 +739,7 @@ class TestObjectExpirer(TestCase): def test_delete_actual_object_quotes(self): name = 'this name should get quoted' - timestamp = '1366063156.863045' + timestamp = Timestamp('1366063156.863045') x = expirer.ObjectExpirer({}) x.swift.make_request = mock.Mock() x.swift.make_request.return_value.status_int = 204 @@ -708,7 +750,7 @@ class TestObjectExpirer(TestCase): def test_delete_actual_object_queue_cleaning(self): name = 'something' - timestamp = '1515544858.80602' + timestamp = Timestamp('1515544858.80602') x = expirer.ObjectExpirer({}) x.swift.make_request = mock.MagicMock() x.delete_actual_object(name, timestamp)