diff --git a/bin/swift-object-expirer b/bin/swift-object-expirer index f3dbbdd7c7..90151580fd 100755 --- a/bin/swift-object-expirer +++ b/bin/swift-object-expirer @@ -17,8 +17,17 @@ from swift.common.daemon import run_daemon from swift.common.utils import parse_options from swift.obj.expirer import ObjectExpirer - +from optparse import OptionParser if __name__ == '__main__': - conf_file, options = parse_options(once=True) + parser = OptionParser("%prog CONFIG [options]") + parser.add_option('--processes', dest='processes', + help="Number of processes to use to do the work, don't " + "use this option to do all the work in one process") + parser.add_option('--process', dest='process', + help="Process number for this process, don't use " + "this option to do all the work in one process, this " + "is used to determine which part of the work this " + "process should do") + conf_file, options = parse_options(parser=parser, once=True) run_daemon(ObjectExpirer, conf_file, **options) diff --git a/doc/source/overview_expiring_objects.rst b/doc/source/overview_expiring_objects.rst index ba91570963..af10d57337 100644 --- a/doc/source/overview_expiring_objects.rst +++ b/doc/source/overview_expiring_objects.rst @@ -10,7 +10,13 @@ The ``X-Delete-After`` header takes a integer number of seconds. The proxy serve As expiring objects are added to the system, the object servers will record the expirations in a hidden ``.expiring_objects`` account for the ``swift-object-expirer`` to handle later. -Just one instance of the ``swift-object-expirer`` daemon needs to run for a cluster. This isn't exactly automatic failover high availability, but if this daemon doesn't run for a few hours it should not be any real issue. The expired-but-not-yet-deleted objects will still ``404 Not Found`` if someone tries to ``GET`` or ``HEAD`` them and they'll just be deleted a bit later when the daemon is restarted. +Usually, just one instance of the ``swift-object-expirer`` daemon needs to run for a cluster. This isn't exactly automatic failover high availability, but if this daemon doesn't run for a few hours it should not be any real issue. The expired-but-not-yet-deleted objects will still ``404 Not Found`` if someone tries to ``GET`` or ``HEAD`` them and they'll just be deleted a bit later when the daemon is restarted. + +By default, the ``swift-object-expirer`` daemon will run with a concurrency of 1. Increase this value to get more concurrency. A concurrency of 1 may not be enough to delete expiring objects in a timely fashion for a particular swift cluster. + +It is possible to run multiple daemons to do different parts of the work if a single process with a concurrency of more than 1 is not enough (see the sample config file for details). + +To run the ``swift-object-expirer`` as multiple processes, set ``processes`` to the number of processes (either in the config file or on the command line). Then run one process for each part. Use ``process`` to specify the part of the work to be done by a process using the command line or the config. So, for example, if you'd like to run three processes, set ``processes`` to 3 and run three processes with ``process`` set to 0, 1, and 2 for the three processes. If multiple processes are used, it's necessary to run one for each part of the work or that part of the work will not be done. The daemon uses the ``/etc/swift/object-expirer.conf`` by default, and here is a quick sample conf file:: @@ -21,21 +27,21 @@ The daemon uses the ``/etc/swift/object-expirer.conf`` by default, and here is a # log_name = swift # log_facility = LOG_LOCAL0 # log_level = INFO - + [object-expirer] interval = 300 - + [pipeline:main] pipeline = catch_errors cache proxy-server - + [app:proxy-server] use = egg:swift#proxy # See proxy-server.conf-sample for options - + [filter:cache] use = egg:swift#memcache # See proxy-server.conf-sample for options - + [filter:catch_errors] use = egg:swift#catch_errors # See proxy-server.conf-sample for options diff --git a/etc/object-expirer.conf-sample b/etc/object-expirer.conf-sample index 926d83a700..cef62be6c3 100644 --- a/etc/object-expirer.conf-sample +++ b/etc/object-expirer.conf-sample @@ -27,6 +27,21 @@ # interval = 300 # auto_create_account_prefix = . # report_interval = 300 +# concurrency is the level of concurrency o use to do the work, this value +# must be set to at least 1 +# concurrency = 1 +# processes is how many parts to divide the work into, one part per process +# that will be doing the work +# processes set 0 means that a single process will be doing all the work +# processes can also be specified on the command line and will override the +# config value +# processes = 0 +# process is which of the parts a particular process will work on +# process can also be specified on the command line and will overide the config +# value +# process is "zero based", if you want to use 3 processes, you should run +# processes with process set to 0, 1, and 2 +# process = 0 [pipeline:main] pipeline = catch_errors cache proxy-server diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index 023b3a27b6..6607ee5442 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -17,8 +17,10 @@ import urllib from random import random from time import time from os.path import join +import hashlib from eventlet import sleep, Timeout +from eventlet.greenpool import GreenPool from swift.common.daemon import Daemon from swift.common.internal_client import InternalClient @@ -53,6 +55,11 @@ class ObjectExpirer(Daemon): self.recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift') self.rcache = join(self.recon_cache_path, 'object.recon') + self.concurrency = int(conf.get('concurrency', 1)) + if self.concurrency < 1: + raise ValueError("concurrency must be set to at least 1") + self.processes = int(self.conf.get('processes', 0)) + self.process = int(self.conf.get('process', 0)) def report(self, final=False): """ @@ -82,8 +89,13 @@ class ObjectExpirer(Daemon): :param args: Extra args to fulfill the Daemon interface; this daemon has no additional args. :param kwargs: Extra keyword args to fulfill the Daemon interface; this - daemon has no additional keyword args. + daemon accepts processes and process keyword args. + These will override the values from the config file if + provided. """ + processes, process = self.get_process_values(kwargs) + pool = GreenPool(self.concurrency) + containers_to_delete = [] self.report_first_time = self.report_last_time = time() self.report_objects = 0 try: @@ -97,27 +109,25 @@ class ObjectExpirer(Daemon): timestamp = int(container) if timestamp > int(time()): break + containers_to_delete.append(container) for o in self.swift.iter_objects(self.expiring_objects_account, container): obj = o['name'].encode('utf8') + if processes > 0: + obj_process = int( + hashlib.md5('%s/%s' % (container, obj)). + hexdigest(), 16) + if obj_process % processes != process: + continue timestamp, actual_obj = obj.split('-', 1) timestamp = int(timestamp) if timestamp > int(time()): break - start_time = time() - try: - self.delete_actual_object(actual_obj, timestamp) - self.swift.delete_object(self.expiring_objects_account, - container, obj) - self.report_objects += 1 - self.logger.increment('objects') - except (Exception, Timeout), err: - self.logger.increment('errors') - self.logger.exception( - _('Exception while deleting object %s %s %s') % - (container, obj, str(err))) - self.logger.timing_since('timing', start_time) - self.report() + pool.spawn_n( + self.delete_object, actual_obj, timestamp, + container, obj) + pool.waitall() + for container in containers_to_delete: try: self.swift.delete_container( self.expiring_objects_account, @@ -145,13 +155,63 @@ class ObjectExpirer(Daemon): while True: begin = time() try: - self.run_once() + self.run_once(*args, **kwargs) except (Exception, Timeout): self.logger.exception(_('Unhandled exception')) elapsed = time() - begin if elapsed < self.interval: sleep(random() * (self.interval - elapsed)) + def get_process_values(self, kwargs): + """ + Gets the processes, process from the kwargs if those values exist. + + Otherwise, return processes, process set in the config file. + + :param kwargs: Keyword args passed into the run_forever(), run_once() + methods. They have values specified on the command + line when the daemon is run. + """ + if kwargs.get('processes') is not None: + processes = int(kwargs['processes']) + else: + processes = self.processes + + if kwargs.get('process') is not None: + process = int(kwargs['process']) + else: + process = self.process + + if process < 0: + raise ValueError( + 'process must be an integer greater than or equal to 0') + + if processes < 0: + raise ValueError( + 'processes must be an integer greater than or equal to 0') + + if processes and process >= processes: + raise ValueError( + 'process must be less than or equal to processes') + + return processes, process + + def delete_object(self, actual_obj, timestamp, container, obj): + start_time = time() + try: + self.delete_actual_object(actual_obj, timestamp) + self.swift.delete_object(self.expiring_objects_account, + container, obj) + self.report_objects += 1 + self.logger.increment('objects') + except (Exception, Timeout), err: + self.logger.increment('errors') + self.logger.exception( + _('Exception while deleting object %s %s %s') % + (container, obj, str(err))) + self.logger.timing_since('timing', start_time) + self.report() + def delete_actual_object(self, actual_obj, timestamp): """ Deletes the end-user object indicated by the actual object name given diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py index 92154b32db..3cad20004e 100644 --- a/test/unit/obj/test_expirer.py +++ b/test/unit/obj/test_expirer.py @@ -17,6 +17,7 @@ import urllib from time import time from unittest import main, TestCase from test.unit import FakeLogger +from copy import deepcopy import mock @@ -52,12 +53,164 @@ class TestObjectExpirer(TestCase): internal_client.sleep = self.old_sleep internal_client.loadapp = self.loadapp + def test_get_process_values_from_kwargs(self): + x = expirer.ObjectExpirer({}) + vals = { + 'processes': 5, + 'process': 1, + } + self.assertEqual((5, 1), x.get_process_values(vals)) + + def test_get_process_values_from_config(self): + vals = { + 'processes': 5, + 'process': 1, + } + x = expirer.ObjectExpirer(vals) + self.assertEqual((5, 1), x.get_process_values({})) + + def test_get_process_values_negative_process(self): + vals = { + 'processes': 5, + 'process': -1, + } + # from config + x = expirer.ObjectExpirer(vals) + self.assertRaises(ValueError, x.get_process_values, {}) + # from kwargs + x = expirer.ObjectExpirer({}) + self.assertRaises(ValueError, x.get_process_values, vals) + + def test_get_process_values_negative_processes(self): + vals = { + 'processes': -5, + 'process': 1, + } + # from config + x = expirer.ObjectExpirer(vals) + self.assertRaises(ValueError, x.get_process_values, {}) + # from kwargs + x = expirer.ObjectExpirer({}) + self.assertRaises(ValueError, x.get_process_values, vals) + + def test_get_process_values_process_greater_than_processes(self): + vals = { + 'processes': 5, + 'process': 7, + } + # from config + x = expirer.ObjectExpirer(vals) + self.assertRaises(ValueError, x.get_process_values, {}) + # from kwargs + x = expirer.ObjectExpirer({}) + self.assertRaises(ValueError, x.get_process_values, vals) + + def test_init_concurrency_too_small(self): + conf = { + 'concurrency': 0, + } + self.assertRaises(ValueError, expirer.ObjectExpirer, conf) + conf = { + 'concurrency': -1, + } + self.assertRaises(ValueError, expirer.ObjectExpirer, conf) + + def test_process_based_concurrency(self): + class ObjectExpirer(expirer.ObjectExpirer): + def __init__(self, conf): + super(ObjectExpirer, self).__init__(conf) + self.processes = 3 + self.deleted_objects = {} + + def delete_object(self, actual_obj, timestamp, container, obj): + if not container in self.deleted_objects: + self.deleted_objects[container] = set() + self.deleted_objects[container].add(obj) + + class InternalClient(object): + def __init__(self, containers): + self.containers = containers + + def get_account_info(self, *a, **kw): + return len(self.containers.keys()), \ + sum([len(self.containers[x]) for x in self.containers]) + + def iter_containers(self, *a, **kw): + return [{'name': x} for x in self.containers.keys()] + + def iter_objects(self, account, container): + return [{'name': x} for x in self.containers[container]] + + def delete_container(*a, **kw): + pass + + containers = { + 0: set('1-one 2-two 3-three'.split()), + 1: set('2-two 3-three 4-four'.split()), + 2: set('5-five 6-six'.split()), + 3: set('7-seven'.split()), + } + x = ObjectExpirer({}) + x.swift = InternalClient(containers) + + deleted_objects = {} + for i in xrange(0, 3): + x.process = i + x.run_once() + self.assertNotEqual(deleted_objects, x.deleted_objects) + deleted_objects = deepcopy(x.deleted_objects) + self.assertEqual(containers, deleted_objects) + + def test_delete_object(self): + class InternalClient(object): + def __init__(self, test, account, container, obj): + self.test = test + self.account = account + self.container = container + self.obj = obj + self.delete_object_called = False + + def delete_object(self, account, container, obj): + self.test.assertEqual(self.account, account) + self.test.assertEqual(self.container, container) + self.test.assertEqual(self.obj, obj) + self.delete_object_called = True + + class DeleteActualObject(object): + def __init__(self, test, actual_obj, timestamp): + self.test = test + self.actual_obj = actual_obj + self.timestamp = timestamp + self.called = False + + def __call__(self, actual_obj, timestamp): + self.test.assertEqual(self.actual_obj, actual_obj) + self.test.assertEqual(self.timestamp, timestamp) + self.called = True + + account = 'account' + container = 'container' + obj = 'obj' + actual_obj = 'actual_obj' + timestamp = 'timestamp' + + x = expirer.ObjectExpirer({}) + x.logger = FakeLogger() + x.swift = \ + InternalClient(self, x.expiring_objects_account, container, obj) + x.delete_actual_object = \ + DeleteActualObject(self, actual_obj, timestamp) + + x.delete_object(actual_obj, timestamp, container, obj) + self.assertTrue(x.swift.delete_object_called) + self.assertTrue(x.delete_actual_object.called) + def test_report(self): x = expirer.ObjectExpirer({}) x.logger = FakeLogger() x.report() - self.assertEquals(x.logger.log_dict['info'], []) + self.assertEqual(x.logger.log_dict['info'], []) x.logger._clear() x.report(final=True) @@ -79,7 +232,7 @@ class TestObjectExpirer(TestCase): x.logger = FakeLogger() x.swift = 'throw error because a string does not have needed methods' x.run_once() - self.assertEquals(x.logger.log_dict['exception'], + self.assertEqual(x.logger.log_dict['exception'], [(("Unhandled exception",), {}, "'str' object has no attribute " "'get_account_info'")]) @@ -96,7 +249,7 @@ class TestObjectExpirer(TestCase): x.logger = FakeLogger() x.swift = InternalClient() x.run_once() - self.assertEquals( + self.assertEqual( x.logger.log_dict['info'], [(('Pass beginning; 1 possible containers; ' '2 possible objects',), {}), @@ -123,7 +276,7 @@ class TestObjectExpirer(TestCase): for exccall in x.logger.log_dict['exception']: self.assertTrue( 'This should not have been called' not in exccall[0][0]) - self.assertEquals( + self.assertEqual( x.logger.log_dict['info'], [(('Pass beginning; 1 possible containers; ' '2 possible objects',), {}), @@ -134,7 +287,7 @@ class TestObjectExpirer(TestCase): x.logger = FakeLogger() x.swift = InternalClient([{'name': str(int(time() - 86400))}]) x.run_once() - self.assertEquals(x.logger.log_dict['exception'], + self.assertEqual(x.logger.log_dict['exception'], [(('Unhandled exception',), {}, str(Exception('This should not have been called')))]) @@ -167,7 +320,7 @@ class TestObjectExpirer(TestCase): for exccall in x.logger.log_dict['exception']: self.assertTrue( 'This should not have been called' not in exccall[0][0]) - self.assertEquals(x.logger.log_dict['info'], + self.assertEqual(x.logger.log_dict['info'], [(('Pass beginning; 1 possible containers; ' '2 possible objects',), {}), (('Pass completed in 0s; 0 objects expired',), {})]) @@ -184,7 +337,7 @@ class TestObjectExpirer(TestCase): for exccall in x.logger.log_dict['exception']: if exccall[0][0].startswith('Exception while deleting '): excswhiledeleting.append(exccall[0][0]) - self.assertEquals(excswhiledeleting, + self.assertEqual(excswhiledeleting, ['Exception while deleting object %d %d-actual-obj ' 'This should not have been called' % (ts, ts)]) @@ -227,10 +380,10 @@ class TestObjectExpirer(TestCase): for exccall in x.logger.log_dict['exception']: if exccall[0][0].startswith('Exception while deleting '): excswhiledeleting.append(exccall[0][0]) - self.assertEquals(excswhiledeleting, + self.assertEqual(excswhiledeleting, ['Exception while deleting object %d %d-actual-obj ' 'failed to delete actual object' % (ts, ts)]) - self.assertEquals(x.logger.log_dict['info'], + self.assertEqual(x.logger.log_dict['info'], [(('Pass beginning; 1 possible containers; ' '2 possible objects',), {}), (('Pass completed in 0s; 0 objects expired',), {})]) @@ -247,7 +400,7 @@ class TestObjectExpirer(TestCase): for exccall in x.logger.log_dict['exception']: if exccall[0][0].startswith('Exception while deleting '): excswhiledeleting.append(exccall[0][0]) - self.assertEquals(excswhiledeleting, + self.assertEqual(excswhiledeleting, ['Exception while deleting object %d %d-actual-obj This should ' 'not have been called' % (ts, ts)]) @@ -275,12 +428,12 @@ class TestObjectExpirer(TestCase): x = expirer.ObjectExpirer({}) x.logger = FakeLogger() x.delete_actual_object = lambda o, t: None - self.assertEquals(x.report_objects, 0) + self.assertEqual(x.report_objects, 0) x.swift = InternalClient([{'name': str(int(time() - 86400))}], [{'name': '%d-actual-obj' % int(time() - 86400)}]) x.run_once() - self.assertEquals(x.report_objects, 1) - self.assertEquals(x.logger.log_dict['info'], + self.assertEqual(x.report_objects, 1) + self.assertEqual(x.logger.log_dict['info'], [(('Pass beginning; 1 possible containers; ' '2 possible objects',), {}), (('Pass completed in 0s; 1 objects expired',), {})]) @@ -315,12 +468,12 @@ class TestObjectExpirer(TestCase): x = expirer.ObjectExpirer({}) x.logger = FakeLogger() x.delete_actual_object = delete_actual_object_test_for_unicode - self.assertEquals(x.report_objects, 0) + self.assertEqual(x.report_objects, 0) x.swift = InternalClient([{'name': str(int(time() - 86400))}], [{'name': u'%d-actual-obj' % int(time() - 86400)}]) x.run_once() - self.assertEquals(x.report_objects, 1) - self.assertEquals(x.logger.log_dict['info'], + self.assertEqual(x.report_objects, 1) + self.assertEqual(x.logger.log_dict['info'], [(('Pass beginning; 1 possible containers; ' '2 possible objects',), {}), (('Pass completed in 0s; 1 objects expired',), {})]) @@ -373,20 +526,20 @@ class TestObjectExpirer(TestCase): for exccall in x.logger.log_dict['exception']: if exccall[0][0].startswith('Exception while deleting '): excswhiledeleting.append(exccall[0][0]) - self.assertEquals(excswhiledeleting, [ + self.assertEqual(sorted(excswhiledeleting), sorted([ 'Exception while deleting object %d %d-actual-obj failed to ' 'delete actual object' % (cts, ots), 'Exception while deleting object %d %d-next-obj failed to ' 'delete actual object' % (cts, ots), + 'Exception while deleting object %d %d-actual-obj failed to ' + 'delete actual object' % (cts + 1, ots), + 'Exception while deleting object %d %d-next-obj failed to ' + 'delete actual object' % (cts + 1, ots), 'Exception while deleting container %d failed to delete ' 'container' % (cts,), - 'Exception while deleting object %d %d-actual-obj failed to ' - 'delete actual object' % (cts + 1, ots), - 'Exception while deleting object %d %d-next-obj failed to ' - 'delete actual object' % (cts + 1, ots), 'Exception while deleting container %d failed to delete ' - 'container' % (cts + 1,)]) - self.assertEquals(x.logger.log_dict['info'], + 'container' % (cts + 1,)])) + self.assertEqual(x.logger.log_dict['info'], [(('Pass beginning; 1 possible containers; ' '2 possible objects',), {}), (('Pass completed in 0s; 0 objects expired',), {})]) @@ -412,8 +565,8 @@ class TestObjectExpirer(TestCase): finally: expirer.random = orig_random expirer.sleep = orig_sleep - self.assertEquals(str(err), 'test_run_forever') - self.assertEquals(last_not_sleep, 0.5 * interval) + self.assertEqual(str(err), 'test_run_forever') + self.assertEqual(last_not_sleep, 0.5 * interval) def test_run_forever_catches_usual_exceptions(self): raises = [0] @@ -435,8 +588,8 @@ class TestObjectExpirer(TestCase): pass finally: expirer.sleep = orig_sleep - self.assertEquals(str(err), 'exiting exception 2') - self.assertEquals(x.logger.log_dict['exception'], + self.assertEqual(str(err), 'exiting exception 2') + self.assertEqual(x.logger.log_dict['exception'], [(('Unhandled exception',), {}, 'exception 1')]) @@ -453,7 +606,7 @@ class TestObjectExpirer(TestCase): x = expirer.ObjectExpirer({}) ts = '1234' x.delete_actual_object('/path/to/object', ts) - self.assertEquals(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) + self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) def test_delete_actual_object_nourlquoting(self): # delete_actual_object should not do its own url quoting because @@ -470,8 +623,8 @@ class TestObjectExpirer(TestCase): x = expirer.ObjectExpirer({}) ts = '1234' x.delete_actual_object('/path/to/object name', ts) - self.assertEquals(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) - self.assertEquals(got_env[0]['PATH_INFO'], '/v1/path/to/object name') + self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) + self.assertEqual(got_env[0]['PATH_INFO'], '/v1/path/to/object name') def test_delete_actual_object_handles_404(self): @@ -513,7 +666,7 @@ class TestObjectExpirer(TestCase): exc = err finally: pass - self.assertEquals(503, exc.resp.status_int) + self.assertEqual(503, exc.resp.status_int) def test_delete_actual_object_quotes(self): name = 'this name should get quoted' @@ -522,7 +675,7 @@ class TestObjectExpirer(TestCase): x.swift.make_request = mock.MagicMock() x.delete_actual_object(name, timestamp) x.swift.make_request.assert_called_once() - self.assertEquals(x.swift.make_request.call_args[0][1], + self.assertEqual(x.swift.make_request.call_args[0][1], '/v1/' + urllib.quote(name))