Add parallelism to object expirer daemon.

Two types of parallelism are added:

- concurrency to speed up what a single process does
- a way to run multiple daemons to work on different parts of the work

DocImpact

Change-Id: I48997f68eb2fd8de19a5ee8b9fcdf76dde2ba0ab
This commit is contained in:
Greg Lange 2013-05-01 21:11:25 +00:00
parent d69fa437cd
commit 209c5ec418
5 changed files with 299 additions and 56 deletions

View File

@ -17,8 +17,17 @@
from swift.common.daemon import run_daemon
from swift.common.utils import parse_options
from swift.obj.expirer import ObjectExpirer
from optparse import OptionParser
if __name__ == '__main__':
conf_file, options = parse_options(once=True)
parser = OptionParser("%prog CONFIG [options]")
parser.add_option('--processes', dest='processes',
help="Number of processes to use to do the work, don't "
"use this option to do all the work in one process")
parser.add_option('--process', dest='process',
help="Process number for this process, don't use "
"this option to do all the work in one process, this "
"is used to determine which part of the work this "
"process should do")
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ObjectExpirer, conf_file, **options)

View File

@ -10,7 +10,13 @@ The ``X-Delete-After`` header takes a integer number of seconds. The proxy serve
As expiring objects are added to the system, the object servers will record the expirations in a hidden ``.expiring_objects`` account for the ``swift-object-expirer`` to handle later.
Just one instance of the ``swift-object-expirer`` daemon needs to run for a cluster. This isn't exactly automatic failover high availability, but if this daemon doesn't run for a few hours it should not be any real issue. The expired-but-not-yet-deleted objects will still ``404 Not Found`` if someone tries to ``GET`` or ``HEAD`` them and they'll just be deleted a bit later when the daemon is restarted.
Usually, just one instance of the ``swift-object-expirer`` daemon needs to run for a cluster. This isn't exactly automatic failover high availability, but if this daemon doesn't run for a few hours it should not be any real issue. The expired-but-not-yet-deleted objects will still ``404 Not Found`` if someone tries to ``GET`` or ``HEAD`` them and they'll just be deleted a bit later when the daemon is restarted.
By default, the ``swift-object-expirer`` daemon will run with a concurrency of 1. Increase this value to get more concurrency. A concurrency of 1 may not be enough to delete expiring objects in a timely fashion for a particular swift cluster.
It is possible to run multiple daemons to do different parts of the work if a single process with a concurrency of more than 1 is not enough (see the sample config file for details).
To run the ``swift-object-expirer`` as multiple processes, set ``processes`` to the number of processes (either in the config file or on the command line). Then run one process for each part. Use ``process`` to specify the part of the work to be done by a process using the command line or the config. So, for example, if you'd like to run three processes, set ``processes`` to 3 and run three processes with ``process`` set to 0, 1, and 2 for the three processes. If multiple processes are used, it's necessary to run one for each part of the work or that part of the work will not be done.
The daemon uses the ``/etc/swift/object-expirer.conf`` by default, and here is a quick sample conf file::
@ -21,21 +27,21 @@ The daemon uses the ``/etc/swift/object-expirer.conf`` by default, and here is a
# log_name = swift
# log_facility = LOG_LOCAL0
# log_level = INFO
[object-expirer]
interval = 300
[pipeline:main]
pipeline = catch_errors cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
# See proxy-server.conf-sample for options
[filter:cache]
use = egg:swift#memcache
# See proxy-server.conf-sample for options
[filter:catch_errors]
use = egg:swift#catch_errors
# See proxy-server.conf-sample for options

View File

@ -24,6 +24,21 @@
# interval = 300
# auto_create_account_prefix = .
# report_interval = 300
# concurrency is the level of concurrency o use to do the work, this value
# must be set to at least 1
# concurrency = 1
# processes is how many parts to divide the work into, one part per process
# that will be doing the work
# processes set 0 means that a single process will be doing all the work
# processes can also be specified on the command line and will override the
# config value
# processes = 0
# process is which of the parts a particular process will work on
# process can also be specified on the command line and will overide the config
# value
# process is "zero based", if you want to use 3 processes, you should run
# processes with process set to 0, 1, and 2
# process = 0
[pipeline:main]
pipeline = catch_errors cache proxy-server

View File

@ -17,8 +17,10 @@ import urllib
from random import random
from time import time
from os.path import join
import hashlib
from eventlet import sleep, Timeout
from eventlet.greenpool import GreenPool
from swift.common.daemon import Daemon
from swift.common.internal_client import InternalClient
@ -53,6 +55,11 @@ class ObjectExpirer(Daemon):
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = join(self.recon_cache_path, 'object.recon')
self.concurrency = int(conf.get('concurrency', 1))
if self.concurrency < 1:
raise ValueError("concurrency must be set to at least 1")
self.processes = int(self.conf.get('processes', 0))
self.process = int(self.conf.get('process', 0))
def report(self, final=False):
"""
@ -82,8 +89,13 @@ class ObjectExpirer(Daemon):
:param args: Extra args to fulfill the Daemon interface; this daemon
has no additional args.
:param kwargs: Extra keyword args to fulfill the Daemon interface; this
daemon has no additional keyword args.
daemon accepts processes and process keyword args.
These will override the values from the config file if
provided.
"""
processes, process = self.get_process_values(kwargs)
pool = GreenPool(self.concurrency)
containers_to_delete = []
self.report_first_time = self.report_last_time = time()
self.report_objects = 0
try:
@ -97,27 +109,25 @@ class ObjectExpirer(Daemon):
timestamp = int(container)
if timestamp > int(time()):
break
containers_to_delete.append(container)
for o in self.swift.iter_objects(self.expiring_objects_account,
container):
obj = o['name'].encode('utf8')
if processes > 0:
obj_process = int(
hashlib.md5('%s/%s' % (container, obj)).
hexdigest(), 16)
if obj_process % processes != process:
continue
timestamp, actual_obj = obj.split('-', 1)
timestamp = int(timestamp)
if timestamp > int(time()):
break
start_time = time()
try:
self.delete_actual_object(actual_obj, timestamp)
self.swift.delete_object(self.expiring_objects_account,
container, obj)
self.report_objects += 1
self.logger.increment('objects')
except (Exception, Timeout), err:
self.logger.increment('errors')
self.logger.exception(
_('Exception while deleting object %s %s %s') %
(container, obj, str(err)))
self.logger.timing_since('timing', start_time)
self.report()
pool.spawn_n(
self.delete_object, actual_obj, timestamp,
container, obj)
pool.waitall()
for container in containers_to_delete:
try:
self.swift.delete_container(
self.expiring_objects_account,
@ -145,13 +155,63 @@ class ObjectExpirer(Daemon):
while True:
begin = time()
try:
self.run_once()
self.run_once(*args, **kwargs)
except (Exception, Timeout):
self.logger.exception(_('Unhandled exception'))
elapsed = time() - begin
if elapsed < self.interval:
sleep(random() * (self.interval - elapsed))
def get_process_values(self, kwargs):
"""
Gets the processes, process from the kwargs if those values exist.
Otherwise, return processes, process set in the config file.
:param kwargs: Keyword args passed into the run_forever(), run_once()
methods. They have values specified on the command
line when the daemon is run.
"""
if kwargs.get('processes') is not None:
processes = int(kwargs['processes'])
else:
processes = self.processes
if kwargs.get('process') is not None:
process = int(kwargs['process'])
else:
process = self.process
if process < 0:
raise ValueError(
'process must be an integer greater than or equal to 0')
if processes < 0:
raise ValueError(
'processes must be an integer greater than or equal to 0')
if processes and process >= processes:
raise ValueError(
'process must be less than or equal to processes')
return processes, process
def delete_object(self, actual_obj, timestamp, container, obj):
start_time = time()
try:
self.delete_actual_object(actual_obj, timestamp)
self.swift.delete_object(self.expiring_objects_account,
container, obj)
self.report_objects += 1
self.logger.increment('objects')
except (Exception, Timeout), err:
self.logger.increment('errors')
self.logger.exception(
_('Exception while deleting object %s %s %s') %
(container, obj, str(err)))
self.logger.timing_since('timing', start_time)
self.report()
def delete_actual_object(self, actual_obj, timestamp):
"""
Deletes the end-user object indicated by the actual object name given

View File

@ -17,6 +17,7 @@ import urllib
from time import time
from unittest import main, TestCase
from test.unit import FakeLogger
from copy import deepcopy
import mock
@ -52,12 +53,164 @@ class TestObjectExpirer(TestCase):
internal_client.sleep = self.old_sleep
internal_client.loadapp = self.loadapp
def test_get_process_values_from_kwargs(self):
x = expirer.ObjectExpirer({})
vals = {
'processes': 5,
'process': 1,
}
self.assertEqual((5, 1), x.get_process_values(vals))
def test_get_process_values_from_config(self):
vals = {
'processes': 5,
'process': 1,
}
x = expirer.ObjectExpirer(vals)
self.assertEqual((5, 1), x.get_process_values({}))
def test_get_process_values_negative_process(self):
vals = {
'processes': 5,
'process': -1,
}
# from config
x = expirer.ObjectExpirer(vals)
self.assertRaises(ValueError, x.get_process_values, {})
# from kwargs
x = expirer.ObjectExpirer({})
self.assertRaises(ValueError, x.get_process_values, vals)
def test_get_process_values_negative_processes(self):
vals = {
'processes': -5,
'process': 1,
}
# from config
x = expirer.ObjectExpirer(vals)
self.assertRaises(ValueError, x.get_process_values, {})
# from kwargs
x = expirer.ObjectExpirer({})
self.assertRaises(ValueError, x.get_process_values, vals)
def test_get_process_values_process_greater_than_processes(self):
vals = {
'processes': 5,
'process': 7,
}
# from config
x = expirer.ObjectExpirer(vals)
self.assertRaises(ValueError, x.get_process_values, {})
# from kwargs
x = expirer.ObjectExpirer({})
self.assertRaises(ValueError, x.get_process_values, vals)
def test_init_concurrency_too_small(self):
conf = {
'concurrency': 0,
}
self.assertRaises(ValueError, expirer.ObjectExpirer, conf)
conf = {
'concurrency': -1,
}
self.assertRaises(ValueError, expirer.ObjectExpirer, conf)
def test_process_based_concurrency(self):
class ObjectExpirer(expirer.ObjectExpirer):
def __init__(self, conf):
super(ObjectExpirer, self).__init__(conf)
self.processes = 3
self.deleted_objects = {}
def delete_object(self, actual_obj, timestamp, container, obj):
if not container in self.deleted_objects:
self.deleted_objects[container] = set()
self.deleted_objects[container].add(obj)
class InternalClient(object):
def __init__(self, containers):
self.containers = containers
def get_account_info(self, *a, **kw):
return len(self.containers.keys()), \
sum([len(self.containers[x]) for x in self.containers])
def iter_containers(self, *a, **kw):
return [{'name': x} for x in self.containers.keys()]
def iter_objects(self, account, container):
return [{'name': x} for x in self.containers[container]]
def delete_container(*a, **kw):
pass
containers = {
0: set('1-one 2-two 3-three'.split()),
1: set('2-two 3-three 4-four'.split()),
2: set('5-five 6-six'.split()),
3: set('7-seven'.split()),
}
x = ObjectExpirer({})
x.swift = InternalClient(containers)
deleted_objects = {}
for i in xrange(0, 3):
x.process = i
x.run_once()
self.assertNotEqual(deleted_objects, x.deleted_objects)
deleted_objects = deepcopy(x.deleted_objects)
self.assertEqual(containers, deleted_objects)
def test_delete_object(self):
class InternalClient(object):
def __init__(self, test, account, container, obj):
self.test = test
self.account = account
self.container = container
self.obj = obj
self.delete_object_called = False
def delete_object(self, account, container, obj):
self.test.assertEqual(self.account, account)
self.test.assertEqual(self.container, container)
self.test.assertEqual(self.obj, obj)
self.delete_object_called = True
class DeleteActualObject(object):
def __init__(self, test, actual_obj, timestamp):
self.test = test
self.actual_obj = actual_obj
self.timestamp = timestamp
self.called = False
def __call__(self, actual_obj, timestamp):
self.test.assertEqual(self.actual_obj, actual_obj)
self.test.assertEqual(self.timestamp, timestamp)
self.called = True
account = 'account'
container = 'container'
obj = 'obj'
actual_obj = 'actual_obj'
timestamp = 'timestamp'
x = expirer.ObjectExpirer({})
x.logger = FakeLogger()
x.swift = \
InternalClient(self, x.expiring_objects_account, container, obj)
x.delete_actual_object = \
DeleteActualObject(self, actual_obj, timestamp)
x.delete_object(actual_obj, timestamp, container, obj)
self.assertTrue(x.swift.delete_object_called)
self.assertTrue(x.delete_actual_object.called)
def test_report(self):
x = expirer.ObjectExpirer({})
x.logger = FakeLogger()
x.report()
self.assertEquals(x.logger.log_dict['info'], [])
self.assertEqual(x.logger.log_dict['info'], [])
x.logger._clear()
x.report(final=True)
@ -79,7 +232,7 @@ class TestObjectExpirer(TestCase):
x.logger = FakeLogger()
x.swift = 'throw error because a string does not have needed methods'
x.run_once()
self.assertEquals(x.logger.log_dict['exception'],
self.assertEqual(x.logger.log_dict['exception'],
[(("Unhandled exception",), {},
"'str' object has no attribute "
"'get_account_info'")])
@ -96,7 +249,7 @@ class TestObjectExpirer(TestCase):
x.logger = FakeLogger()
x.swift = InternalClient()
x.run_once()
self.assertEquals(
self.assertEqual(
x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
@ -123,7 +276,7 @@ class TestObjectExpirer(TestCase):
for exccall in x.logger.log_dict['exception']:
self.assertTrue(
'This should not have been called' not in exccall[0][0])
self.assertEquals(
self.assertEqual(
x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
@ -134,7 +287,7 @@ class TestObjectExpirer(TestCase):
x.logger = FakeLogger()
x.swift = InternalClient([{'name': str(int(time() - 86400))}])
x.run_once()
self.assertEquals(x.logger.log_dict['exception'],
self.assertEqual(x.logger.log_dict['exception'],
[(('Unhandled exception',), {},
str(Exception('This should not have been called')))])
@ -167,7 +320,7 @@ class TestObjectExpirer(TestCase):
for exccall in x.logger.log_dict['exception']:
self.assertTrue(
'This should not have been called' not in exccall[0][0])
self.assertEquals(x.logger.log_dict['info'],
self.assertEqual(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
(('Pass completed in 0s; 0 objects expired',), {})])
@ -184,7 +337,7 @@ class TestObjectExpirer(TestCase):
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting,
self.assertEqual(excswhiledeleting,
['Exception while deleting object %d %d-actual-obj '
'This should not have been called' % (ts, ts)])
@ -227,10 +380,10 @@ class TestObjectExpirer(TestCase):
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting,
self.assertEqual(excswhiledeleting,
['Exception while deleting object %d %d-actual-obj '
'failed to delete actual object' % (ts, ts)])
self.assertEquals(x.logger.log_dict['info'],
self.assertEqual(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
(('Pass completed in 0s; 0 objects expired',), {})])
@ -247,7 +400,7 @@ class TestObjectExpirer(TestCase):
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting,
self.assertEqual(excswhiledeleting,
['Exception while deleting object %d %d-actual-obj This should '
'not have been called' % (ts, ts)])
@ -275,12 +428,12 @@ class TestObjectExpirer(TestCase):
x = expirer.ObjectExpirer({})
x.logger = FakeLogger()
x.delete_actual_object = lambda o, t: None
self.assertEquals(x.report_objects, 0)
self.assertEqual(x.report_objects, 0)
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % int(time() - 86400)}])
x.run_once()
self.assertEquals(x.report_objects, 1)
self.assertEquals(x.logger.log_dict['info'],
self.assertEqual(x.report_objects, 1)
self.assertEqual(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
(('Pass completed in 0s; 1 objects expired',), {})])
@ -315,12 +468,12 @@ class TestObjectExpirer(TestCase):
x = expirer.ObjectExpirer({})
x.logger = FakeLogger()
x.delete_actual_object = delete_actual_object_test_for_unicode
self.assertEquals(x.report_objects, 0)
self.assertEqual(x.report_objects, 0)
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': u'%d-actual-obj' % int(time() - 86400)}])
x.run_once()
self.assertEquals(x.report_objects, 1)
self.assertEquals(x.logger.log_dict['info'],
self.assertEqual(x.report_objects, 1)
self.assertEqual(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
(('Pass completed in 0s; 1 objects expired',), {})])
@ -373,20 +526,20 @@ class TestObjectExpirer(TestCase):
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting, [
self.assertEqual(sorted(excswhiledeleting), sorted([
'Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts, ots),
'Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts, ots),
'Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts + 1, ots),
'Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts + 1, ots),
'Exception while deleting container %d failed to delete '
'container' % (cts,),
'Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts + 1, ots),
'Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts + 1, ots),
'Exception while deleting container %d failed to delete '
'container' % (cts + 1,)])
self.assertEquals(x.logger.log_dict['info'],
'container' % (cts + 1,)]))
self.assertEqual(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
(('Pass completed in 0s; 0 objects expired',), {})])
@ -412,8 +565,8 @@ class TestObjectExpirer(TestCase):
finally:
expirer.random = orig_random
expirer.sleep = orig_sleep
self.assertEquals(str(err), 'test_run_forever')
self.assertEquals(last_not_sleep, 0.5 * interval)
self.assertEqual(str(err), 'test_run_forever')
self.assertEqual(last_not_sleep, 0.5 * interval)
def test_run_forever_catches_usual_exceptions(self):
raises = [0]
@ -435,8 +588,8 @@ class TestObjectExpirer(TestCase):
pass
finally:
expirer.sleep = orig_sleep
self.assertEquals(str(err), 'exiting exception 2')
self.assertEquals(x.logger.log_dict['exception'],
self.assertEqual(str(err), 'exiting exception 2')
self.assertEqual(x.logger.log_dict['exception'],
[(('Unhandled exception',), {},
'exception 1')])
@ -453,7 +606,7 @@ class TestObjectExpirer(TestCase):
x = expirer.ObjectExpirer({})
ts = '1234'
x.delete_actual_object('/path/to/object', ts)
self.assertEquals(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
def test_delete_actual_object_nourlquoting(self):
# delete_actual_object should not do its own url quoting because
@ -470,8 +623,8 @@ class TestObjectExpirer(TestCase):
x = expirer.ObjectExpirer({})
ts = '1234'
x.delete_actual_object('/path/to/object name', ts)
self.assertEquals(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
self.assertEquals(got_env[0]['PATH_INFO'], '/v1/path/to/object name')
self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
self.assertEqual(got_env[0]['PATH_INFO'], '/v1/path/to/object name')
def test_delete_actual_object_handles_404(self):
@ -513,7 +666,7 @@ class TestObjectExpirer(TestCase):
exc = err
finally:
pass
self.assertEquals(503, exc.resp.status_int)
self.assertEqual(503, exc.resp.status_int)
def test_delete_actual_object_quotes(self):
name = 'this name should get quoted'
@ -522,7 +675,7 @@ class TestObjectExpirer(TestCase):
x.swift.make_request = mock.MagicMock()
x.delete_actual_object(name, timestamp)
x.swift.make_request.assert_called_once()
self.assertEquals(x.swift.make_request.call_args[0][1],
self.assertEqual(x.swift.make_request.call_args[0][1],
'/v1/' + urllib.quote(name))