diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index 158deacf85..cecd552987 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -25,12 +25,14 @@ from eventlet.greenpool import GreenPool from swift.common.daemon import Daemon from swift.common.internal_client import InternalClient, UnexpectedResponse -from swift.common.utils import get_logger, dump_recon_cache +from swift.common.utils import get_logger, dump_recon_cache, split_path from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \ HTTP_PRECONDITION_FAILED from swift.container.reconciler import direct_delete_container_entry +MAX_OBJECTS_TO_CACHE = 100000 + class ObjectExpirer(Daemon): """ @@ -85,6 +87,66 @@ class ObjectExpirer(Daemon): (elapsed, self.report_objects)) self.report_last_time = time() + def iter_cont_objs_to_expire(self): + """ + Yields (container, obj) tuples to be deleted + """ + obj_cache = {} + cnt = 0 + + all_containers = set() + + for c in self.swift.iter_containers(self.expiring_objects_account): + container = c['name'] + timestamp = int(container) + if timestamp > int(time()): + break + all_containers.add(container) + for o in self.swift.iter_objects(self.expiring_objects_account, + container): + obj = o['name'].encode('utf8') + timestamp, actual_obj = obj.split('-', 1) + timestamp = int(timestamp) + if timestamp > int(time()): + break + try: + cust_account, cust_cont, cust_obj = \ + split_path('/' + actual_obj, 3, 3, True) + cache_key = '%s/%s' % (cust_account, cust_cont) + except ValueError: + cache_key = None + + if self.processes > 0: + obj_process = int( + hashlib.md5('%s/%s' % (str(container), obj)). + hexdigest(), 16) + if obj_process % self.processes != self.process: + continue + + if cache_key not in obj_cache: + obj_cache[cache_key] = [] + obj_cache[cache_key].append((container, obj)) + cnt += 1 + + if cnt > MAX_OBJECTS_TO_CACHE: + while obj_cache: + for key in obj_cache.keys(): + if obj_cache[key]: + yield obj_cache[key].pop() + cnt -= 1 + else: + del obj_cache[key] + + while obj_cache: + for key in obj_cache.keys(): + if obj_cache[key]: + yield obj_cache[key].pop() + else: + del obj_cache[key] + + for container in all_containers: + yield (container, None) + def run_once(self, *args, **kwargs): """ Executes a single pass, looking for objects to expire. @@ -96,9 +158,9 @@ class ObjectExpirer(Daemon): These will override the values from the config file if provided. """ - processes, process = self.get_process_values(kwargs) + self.get_process_values(kwargs) pool = GreenPool(self.concurrency) - containers_to_delete = [] + containers_to_delete = set([]) self.report_first_time = self.report_last_time = time() self.report_objects = 0 try: @@ -107,28 +169,21 @@ class ObjectExpirer(Daemon): self.swift.get_account_info(self.expiring_objects_account) self.logger.info(_('Pass beginning; %s possible containers; %s ' 'possible objects') % (containers, objects)) - for c in self.swift.iter_containers(self.expiring_objects_account): - container = c['name'] - timestamp = int(container) + + for container, obj in self.iter_cont_objs_to_expire(): + containers_to_delete.add(container) + + if not obj: + continue + + timestamp, actual_obj = obj.split('-', 1) + timestamp = int(timestamp) if timestamp > int(time()): break - containers_to_delete.append(container) - for o in self.swift.iter_objects(self.expiring_objects_account, - container): - obj = o['name'].encode('utf8') - if processes > 0: - obj_process = int( - hashlib.md5('%s/%s' % (str(container), obj)). - hexdigest(), 16) - if obj_process % processes != process: - continue - timestamp, actual_obj = obj.split('-', 1) - timestamp = int(timestamp) - if timestamp > int(time()): - break - pool.spawn_n( - self.delete_object, actual_obj, timestamp, - container, obj) + pool.spawn_n( + self.delete_object, actual_obj, timestamp, + container, obj) + pool.waitall() for container in containers_to_delete: try: @@ -167,38 +222,32 @@ class ObjectExpirer(Daemon): def get_process_values(self, kwargs): """ - Gets the processes, process from the kwargs if those values exist. - - Otherwise, return processes, process set in the config file. + Sets self.processes and self.process from the kwargs if those + values exist, otherwise, leaves those values as they were set in + the config file. :param kwargs: Keyword args passed into the run_forever(), run_once() methods. They have values specified on the command line when the daemon is run. """ if kwargs.get('processes') is not None: - processes = int(kwargs['processes']) - else: - processes = self.processes + self.processes = int(kwargs['processes']) if kwargs.get('process') is not None: - process = int(kwargs['process']) - else: - process = self.process + self.process = int(kwargs['process']) - if process < 0: + if self.process < 0: raise ValueError( 'process must be an integer greater than or equal to 0') - if processes < 0: + if self.processes < 0: raise ValueError( 'processes must be an integer greater than or equal to 0') - if processes and process >= processes: + if self.processes and self.process >= self.processes: raise ValueError( 'process must be less than or equal to processes') - return processes, process - def delete_object(self, actual_obj, timestamp, container, obj): start_time = time() try: diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py index 1e43f9c754..8046c719ba 100644 --- a/test/unit/obj/test_expirer.py +++ b/test/unit/obj/test_expirer.py @@ -65,7 +65,9 @@ class TestObjectExpirer(TestCase): 'processes': 5, 'process': 1, } - self.assertEqual((5, 1), x.get_process_values(vals)) + x.get_process_values(vals) + self.assertEqual(x.processes, 5) + self.assertEqual(x.process, 1) def test_get_process_values_from_config(self): vals = { @@ -73,7 +75,9 @@ class TestObjectExpirer(TestCase): 'process': 1, } x = expirer.ObjectExpirer(vals) - self.assertEqual((5, 1), x.get_process_values({})) + x.get_process_values({}) + self.assertEqual(x.processes, 5) + self.assertEqual(x.process, 1) def test_get_process_values_negative_process(self): vals = { @@ -129,11 +133,13 @@ class TestObjectExpirer(TestCase): super(ObjectExpirer, self).__init__(conf) self.processes = 3 self.deleted_objects = {} + self.obj_containers_in_order = [] def delete_object(self, actual_obj, timestamp, container, obj): if container not in self.deleted_objects: self.deleted_objects[container] = set() self.deleted_objects[container].add(obj) + self.obj_containers_in_order.append(container) class InternalClient(object): @@ -172,6 +178,7 @@ class TestObjectExpirer(TestCase): self.assertEqual(containers[ukey].pop(), deleted_objects[ukey].pop().decode('utf8')) self.assertEqual(containers, deleted_objects) + self.assertEqual(len(set(x.obj_containers_in_order[:4])), 4) def test_delete_object(self): class InternalClient(object): @@ -451,18 +458,19 @@ class TestObjectExpirer(TestCase): fake_swift = InternalClient( [{'name': str(int(time() - 86400))}], - [{'name': '%d-actual-obj' % int(time() - 86400)}]) + [{'name': '%d-acc/c/actual-obj' % int(time() - 86400)}]) x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift) x.delete_actual_object = lambda o, t: None x.pop_queue = lambda c, o: None self.assertEqual(x.report_objects, 0) - x.run_once() - self.assertEqual(x.report_objects, 1) - self.assertEqual( - x.logger.log_dict['info'], - [(('Pass beginning; 1 possible containers; ' - '2 possible objects',), {}), - (('Pass completed in 0s; 1 objects expired',), {})]) + with mock.patch('swift.obj.expirer.MAX_OBJECTS_TO_CACHE', 0): + x.run_once() + self.assertEqual(x.report_objects, 1) + self.assertEqual( + x.logger.log_dict['info'], + [(('Pass beginning; 1 possible containers; ' + '2 possible objects',), {}), + (('Pass completed in 0s; 1 objects expired',), {})]) def test_delete_actual_object_does_not_get_unicode(self): class InternalClient(object):