From eff9ab74a4b88102e8938c52cb01b311f932949e Mon Sep 17 00:00:00 2001 From: David Goetz Date: Fri, 15 Aug 2014 15:54:05 -0700 Subject: [PATCH] Delete expired objects in slightly smarter way. When the expirer tries to delete customer objects, if it just walks through the containers in order the deamon will tend to send DELETEs to the same container highly concurrently. This will in turn create a lot of asyncs because of all the concurrent deletes. If the deletes were spread out to multiple containers it would improve performance and decrease the number of asyncs made. Change-Id: I3d08118c197b7f18dd7e880bd5664508934ffd24 --- swift/obj/expirer.py | 123 ++++++++++++++++++++++++---------- test/unit/obj/test_expirer.py | 28 +++++--- 2 files changed, 104 insertions(+), 47 deletions(-) diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index 158deacf85..cecd552987 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -25,12 +25,14 @@ from eventlet.greenpool import GreenPool from swift.common.daemon import Daemon from swift.common.internal_client import InternalClient, UnexpectedResponse -from swift.common.utils import get_logger, dump_recon_cache +from swift.common.utils import get_logger, dump_recon_cache, split_path from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \ HTTP_PRECONDITION_FAILED from swift.container.reconciler import direct_delete_container_entry +MAX_OBJECTS_TO_CACHE = 100000 + class ObjectExpirer(Daemon): """ @@ -85,6 +87,66 @@ class ObjectExpirer(Daemon): (elapsed, self.report_objects)) self.report_last_time = time() + def iter_cont_objs_to_expire(self): + """ + Yields (container, obj) tuples to be deleted + """ + obj_cache = {} + cnt = 0 + + all_containers = set() + + for c in self.swift.iter_containers(self.expiring_objects_account): + container = c['name'] + timestamp = int(container) + if timestamp > int(time()): + break + all_containers.add(container) + for o in self.swift.iter_objects(self.expiring_objects_account, + container): + obj = o['name'].encode('utf8') + timestamp, actual_obj = obj.split('-', 1) + timestamp = int(timestamp) + if timestamp > int(time()): + break + try: + cust_account, cust_cont, cust_obj = \ + split_path('/' + actual_obj, 3, 3, True) + cache_key = '%s/%s' % (cust_account, cust_cont) + except ValueError: + cache_key = None + + if self.processes > 0: + obj_process = int( + hashlib.md5('%s/%s' % (str(container), obj)). + hexdigest(), 16) + if obj_process % self.processes != self.process: + continue + + if cache_key not in obj_cache: + obj_cache[cache_key] = [] + obj_cache[cache_key].append((container, obj)) + cnt += 1 + + if cnt > MAX_OBJECTS_TO_CACHE: + while obj_cache: + for key in obj_cache.keys(): + if obj_cache[key]: + yield obj_cache[key].pop() + cnt -= 1 + else: + del obj_cache[key] + + while obj_cache: + for key in obj_cache.keys(): + if obj_cache[key]: + yield obj_cache[key].pop() + else: + del obj_cache[key] + + for container in all_containers: + yield (container, None) + def run_once(self, *args, **kwargs): """ Executes a single pass, looking for objects to expire. @@ -96,9 +158,9 @@ class ObjectExpirer(Daemon): These will override the values from the config file if provided. """ - processes, process = self.get_process_values(kwargs) + self.get_process_values(kwargs) pool = GreenPool(self.concurrency) - containers_to_delete = [] + containers_to_delete = set([]) self.report_first_time = self.report_last_time = time() self.report_objects = 0 try: @@ -107,28 +169,21 @@ class ObjectExpirer(Daemon): self.swift.get_account_info(self.expiring_objects_account) self.logger.info(_('Pass beginning; %s possible containers; %s ' 'possible objects') % (containers, objects)) - for c in self.swift.iter_containers(self.expiring_objects_account): - container = c['name'] - timestamp = int(container) + + for container, obj in self.iter_cont_objs_to_expire(): + containers_to_delete.add(container) + + if not obj: + continue + + timestamp, actual_obj = obj.split('-', 1) + timestamp = int(timestamp) if timestamp > int(time()): break - containers_to_delete.append(container) - for o in self.swift.iter_objects(self.expiring_objects_account, - container): - obj = o['name'].encode('utf8') - if processes > 0: - obj_process = int( - hashlib.md5('%s/%s' % (str(container), obj)). - hexdigest(), 16) - if obj_process % processes != process: - continue - timestamp, actual_obj = obj.split('-', 1) - timestamp = int(timestamp) - if timestamp > int(time()): - break - pool.spawn_n( - self.delete_object, actual_obj, timestamp, - container, obj) + pool.spawn_n( + self.delete_object, actual_obj, timestamp, + container, obj) + pool.waitall() for container in containers_to_delete: try: @@ -167,38 +222,32 @@ class ObjectExpirer(Daemon): def get_process_values(self, kwargs): """ - Gets the processes, process from the kwargs if those values exist. - - Otherwise, return processes, process set in the config file. + Sets self.processes and self.process from the kwargs if those + values exist, otherwise, leaves those values as they were set in + the config file. :param kwargs: Keyword args passed into the run_forever(), run_once() methods. They have values specified on the command line when the daemon is run. """ if kwargs.get('processes') is not None: - processes = int(kwargs['processes']) - else: - processes = self.processes + self.processes = int(kwargs['processes']) if kwargs.get('process') is not None: - process = int(kwargs['process']) - else: - process = self.process + self.process = int(kwargs['process']) - if process < 0: + if self.process < 0: raise ValueError( 'process must be an integer greater than or equal to 0') - if processes < 0: + if self.processes < 0: raise ValueError( 'processes must be an integer greater than or equal to 0') - if processes and process >= processes: + if self.processes and self.process >= self.processes: raise ValueError( 'process must be less than or equal to processes') - return processes, process - def delete_object(self, actual_obj, timestamp, container, obj): start_time = time() try: diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py index 1e43f9c754..8046c719ba 100644 --- a/test/unit/obj/test_expirer.py +++ b/test/unit/obj/test_expirer.py @@ -65,7 +65,9 @@ class TestObjectExpirer(TestCase): 'processes': 5, 'process': 1, } - self.assertEqual((5, 1), x.get_process_values(vals)) + x.get_process_values(vals) + self.assertEqual(x.processes, 5) + self.assertEqual(x.process, 1) def test_get_process_values_from_config(self): vals = { @@ -73,7 +75,9 @@ class TestObjectExpirer(TestCase): 'process': 1, } x = expirer.ObjectExpirer(vals) - self.assertEqual((5, 1), x.get_process_values({})) + x.get_process_values({}) + self.assertEqual(x.processes, 5) + self.assertEqual(x.process, 1) def test_get_process_values_negative_process(self): vals = { @@ -129,11 +133,13 @@ class TestObjectExpirer(TestCase): super(ObjectExpirer, self).__init__(conf) self.processes = 3 self.deleted_objects = {} + self.obj_containers_in_order = [] def delete_object(self, actual_obj, timestamp, container, obj): if container not in self.deleted_objects: self.deleted_objects[container] = set() self.deleted_objects[container].add(obj) + self.obj_containers_in_order.append(container) class InternalClient(object): @@ -172,6 +178,7 @@ class TestObjectExpirer(TestCase): self.assertEqual(containers[ukey].pop(), deleted_objects[ukey].pop().decode('utf8')) self.assertEqual(containers, deleted_objects) + self.assertEqual(len(set(x.obj_containers_in_order[:4])), 4) def test_delete_object(self): class InternalClient(object): @@ -451,18 +458,19 @@ class TestObjectExpirer(TestCase): fake_swift = InternalClient( [{'name': str(int(time() - 86400))}], - [{'name': '%d-actual-obj' % int(time() - 86400)}]) + [{'name': '%d-acc/c/actual-obj' % int(time() - 86400)}]) x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift) x.delete_actual_object = lambda o, t: None x.pop_queue = lambda c, o: None self.assertEqual(x.report_objects, 0) - x.run_once() - self.assertEqual(x.report_objects, 1) - self.assertEqual( - x.logger.log_dict['info'], - [(('Pass beginning; 1 possible containers; ' - '2 possible objects',), {}), - (('Pass completed in 0s; 1 objects expired',), {})]) + with mock.patch('swift.obj.expirer.MAX_OBJECTS_TO_CACHE', 0): + x.run_once() + self.assertEqual(x.report_objects, 1) + self.assertEqual( + x.logger.log_dict['info'], + [(('Pass beginning; 1 possible containers; ' + '2 possible objects',), {}), + (('Pass completed in 0s; 1 objects expired',), {})]) def test_delete_actual_object_does_not_get_unicode(self): class InternalClient(object):