Add tasks_per_second option to expirer
This allows operators to throttle expirers as needed. Partial-Bug: #1784753 Change-Id: If75dabb431bddd4ad6100e41395bb6c31a4ce569
This commit is contained in:
parent
6b7eecab5d
commit
b05ad82959
@ -50,6 +50,9 @@
|
|||||||
# must be set to at least 1
|
# must be set to at least 1
|
||||||
# concurrency = 1
|
# concurrency = 1
|
||||||
#
|
#
|
||||||
|
# deletes can be ratelimited to prevent the expirer from overwhelming the cluster
|
||||||
|
# tasks_per_second = 50.0
|
||||||
|
#
|
||||||
# processes is how many parts to divide the work into, one part per process
|
# processes is how many parts to divide the work into, one part per process
|
||||||
# that will be doing the work
|
# that will be doing the work
|
||||||
# processes set 0 means that a single process will be doing all the work
|
# processes set 0 means that a single process will be doing all the work
|
||||||
|
@ -518,6 +518,9 @@ use = egg:swift#recon
|
|||||||
# must be set to at least 1
|
# must be set to at least 1
|
||||||
# concurrency = 1
|
# concurrency = 1
|
||||||
#
|
#
|
||||||
|
# deletes can be ratelimited to prevent the expirer from overwhelming the cluster
|
||||||
|
# tasks_per_second = 50.0
|
||||||
|
#
|
||||||
# The expirer will re-attempt expiring if the source object is not available
|
# The expirer will re-attempt expiring if the source object is not available
|
||||||
# up to reclaim_age seconds before it gives up and deletes the entry in the
|
# up to reclaim_age seconds before it gives up and deletes the entry in the
|
||||||
# queue.
|
# queue.
|
||||||
|
@ -29,7 +29,8 @@ from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX
|
|||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
from swift.common.internal_client import InternalClient, UnexpectedResponse
|
from swift.common.internal_client import InternalClient, UnexpectedResponse
|
||||||
from swift.common.utils import get_logger, dump_recon_cache, split_path, \
|
from swift.common.utils import get_logger, dump_recon_cache, split_path, \
|
||||||
Timestamp, config_true_value, normalize_delete_at_timestamp
|
Timestamp, config_true_value, normalize_delete_at_timestamp, \
|
||||||
|
RateLimitedIterator
|
||||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
|
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
|
||||||
HTTP_PRECONDITION_FAILED
|
HTTP_PRECONDITION_FAILED
|
||||||
from swift.common.swob import wsgi_quote, str_to_wsgi
|
from swift.common.swob import wsgi_quote, str_to_wsgi
|
||||||
@ -79,6 +80,7 @@ class ObjectExpirer(Daemon):
|
|||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.logger = logger or get_logger(conf, log_route='object-expirer')
|
self.logger = logger or get_logger(conf, log_route='object-expirer')
|
||||||
self.interval = int(conf.get('interval') or 300)
|
self.interval = int(conf.get('interval') or 300)
|
||||||
|
self.tasks_per_second = float(conf.get('tasks_per_second', 50.0))
|
||||||
|
|
||||||
self.conf_path = \
|
self.conf_path = \
|
||||||
self.conf.get('__file__') or '/etc/swift/object-expirer.conf'
|
self.conf.get('__file__') or '/etc/swift/object-expirer.conf'
|
||||||
@ -351,8 +353,10 @@ class ObjectExpirer(Daemon):
|
|||||||
delete_task_iter = \
|
delete_task_iter = \
|
||||||
self.round_robin_order(self.iter_task_to_expire(
|
self.round_robin_order(self.iter_task_to_expire(
|
||||||
task_account_container_list, my_index, divisor))
|
task_account_container_list, my_index, divisor))
|
||||||
|
rate_limited_iter = RateLimitedIterator(
|
||||||
for delete_task in delete_task_iter:
|
delete_task_iter,
|
||||||
|
elements_per_second=self.tasks_per_second)
|
||||||
|
for delete_task in rate_limited_iter:
|
||||||
pool.spawn_n(self.delete_object, **delete_task)
|
pool.spawn_n(self.delete_object, **delete_task)
|
||||||
|
|
||||||
pool.waitall()
|
pool.waitall()
|
||||||
|
@ -573,6 +573,28 @@ class TestObjectExpirer(TestCase):
|
|||||||
'Pass completed in 0s; 10 objects expired',
|
'Pass completed in 0s; 10 objects expired',
|
||||||
])
|
])
|
||||||
|
|
||||||
|
def test_run_once_rate_limited(self):
|
||||||
|
x = expirer.ObjectExpirer(
|
||||||
|
dict(self.conf, tasks_per_second=2),
|
||||||
|
logger=self.logger,
|
||||||
|
swift=self.fake_swift)
|
||||||
|
x.pop_queue = lambda a, c, o: None
|
||||||
|
|
||||||
|
calls = []
|
||||||
|
|
||||||
|
def fake_ratelimiter(iterator, elements_per_second):
|
||||||
|
captured_iter = list(iterator)
|
||||||
|
calls.append((captured_iter, elements_per_second))
|
||||||
|
return captured_iter
|
||||||
|
|
||||||
|
with mock.patch('swift.obj.expirer.RateLimitedIterator',
|
||||||
|
side_effect=fake_ratelimiter):
|
||||||
|
x.run_once()
|
||||||
|
self.assertEqual(calls, [([
|
||||||
|
self.make_task(self.past_time, target_path)
|
||||||
|
for target_path in self.expired_target_path_list
|
||||||
|
], 2)])
|
||||||
|
|
||||||
def test_skip_task_account_without_task_container(self):
|
def test_skip_task_account_without_task_container(self):
|
||||||
fake_swift = FakeInternalClient({
|
fake_swift = FakeInternalClient({
|
||||||
# task account has no containers
|
# task account has no containers
|
||||||
|
Loading…
x
Reference in New Issue
Block a user