Refactor expirer's task round robin implementation

Object-expirer changes order of expiration tasks to avoid deleting
objects in a certain container continuously.

To make review for expirer's task queue update patch [1] easy,
this patch refactors the implementation of the order change. In this
patch, the order change is divided as a function.

In [1], there will be two implementations for legacy task queue
and for general task queue. The two implementations have similar
codes. This patch helps to avoid copying codes in the two implementations.

Other than dividing function, this patch tries to resolve:
- Separate container iteration and object iteration to avoid the generator
  termination with (container, None) tuple.
- Using Timestamp class for delete_timestamp to be consist with other modules
- Change yielded delete task object info from tuple to dict because that
  includes several complex info (e.g. task_container, task_object,
  and target_path)
- Fix minor docs and tests depends on the changes above

[1]: https://review.openstack.org/#/c/517389

Co-Authored-By: Kota Tsuyuzaki <tsuyuzaki.kota@lab.ntt.co.jp>

Change-Id: Ibf61eb1f767a48cb457dd494e1f7c12acfd205de
This commit is contained in:
Kazuhiro MIYAHARA 2018-01-26 07:54:31 +00:00 committed by Kota Tsuyuzaki
parent b33941feda
commit 303635348b
2 changed files with 143 additions and 81 deletions

View File

@ -19,6 +19,7 @@ from random import random
from time import time from time import time
from os.path import join from os.path import join
from swift import gettext_ as _ from swift import gettext_ as _
from collections import defaultdict, deque
import hashlib import hashlib
from eventlet import sleep, Timeout from eventlet import sleep, Timeout
@ -26,7 +27,8 @@ from eventlet.greenpool import GreenPool
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
from swift.common.internal_client import InternalClient, UnexpectedResponse from swift.common.internal_client import InternalClient, UnexpectedResponse
from swift.common.utils import get_logger, dump_recon_cache, split_path from swift.common.utils import get_logger, dump_recon_cache, split_path, \
Timestamp
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \ from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
HTTP_PRECONDITION_FAILED HTTP_PRECONDITION_FAILED
@ -93,65 +95,84 @@ class ObjectExpirer(Daemon):
'time': elapsed, 'objects': self.report_objects}) 'time': elapsed, 'objects': self.report_objects})
self.report_last_time = time() self.report_last_time = time()
def iter_cont_objs_to_expire(self): def round_robin_order(self, task_iter):
""" """
Yields (container, obj) tuples to be deleted Change order of expiration tasks to avoid deleting objects in a
certain container continuously.
:param task_iter: An iterator of delete-task dicts, which should each
have a ``target_path`` key.
""" """
obj_cache = {} obj_cache = defaultdict(deque)
cnt = 0 cnt = 0
all_containers = set() def dump_obj_cache_in_round_robin():
while obj_cache:
for key in sorted(obj_cache):
if obj_cache[key]:
yield obj_cache[key].popleft()
else:
del obj_cache[key]
for delete_task in task_iter:
try:
target_account, target_container, _junk = \
split_path('/' + delete_task['target_path'], 3, 3, True)
cache_key = '%s/%s' % (target_account, target_container)
except ValueError:
cache_key = None
obj_cache[cache_key].append(delete_task)
cnt += 1
if cnt > MAX_OBJECTS_TO_CACHE:
for task in dump_obj_cache_in_round_robin():
yield task
cnt = 0
for task in dump_obj_cache_in_round_robin():
yield task
def iter_task_containers_to_expire(self):
"""
Yields container name under the expiring_objects_account if
the container name (i.e. timestamp) is past.
"""
for c in self.swift.iter_containers(self.expiring_objects_account): for c in self.swift.iter_containers(self.expiring_objects_account):
container = str(c['name']) task_container = str(c['name'])
timestamp = int(container) timestamp = Timestamp(task_container)
if timestamp > int(time()): if timestamp > Timestamp.now():
break break
all_containers.add(container) yield task_container
def iter_task_to_expire(self, task_containers):
"""
Yields task expire info dict which consists of task_container,
target_path, timestamp_to_delete, and target_path
"""
for task_container in task_containers:
for o in self.swift.iter_objects(self.expiring_objects_account, for o in self.swift.iter_objects(self.expiring_objects_account,
container): task_container):
obj = o['name'].encode('utf8') task_object = o['name'].encode('utf8')
timestamp, actual_obj = obj.split('-', 1) delete_timestamp, target_path = task_object.split('-', 1)
timestamp = int(timestamp) delete_timestamp = Timestamp(delete_timestamp)
if timestamp > int(time()): if delete_timestamp > Timestamp.now():
# we shouldn't yield the object that doesn't reach
# the expiration date yet.
break break
try:
cust_account, cust_cont, cust_obj = \
split_path('/' + actual_obj, 3, 3, True)
cache_key = '%s/%s' % (cust_account, cust_cont)
except ValueError:
cache_key = None
if self.processes > 0: if self.processes > 0:
obj_process = int( obj_process = int(
hashlib.md5('%s/%s' % (container, obj)). hashlib.md5('%s/%s' % (task_container, task_object)).
hexdigest(), 16) hexdigest(), 16)
if obj_process % self.processes != self.process: if obj_process % self.processes != self.process:
continue continue
if cache_key not in obj_cache: yield {'task_container': task_container,
obj_cache[cache_key] = [] 'task_object': task_object,
obj_cache[cache_key].append((container, obj)) 'target_path': target_path,
cnt += 1 'delete_timestamp': delete_timestamp}
if cnt > MAX_OBJECTS_TO_CACHE:
while obj_cache:
for key in obj_cache.keys():
if obj_cache[key]:
yield obj_cache[key].pop()
cnt -= 1
else:
del obj_cache[key]
while obj_cache:
for key in obj_cache.keys():
if obj_cache[key]:
yield obj_cache[key].pop()
else:
del obj_cache[key]
for container in all_containers:
yield (container, None)
def run_once(self, *args, **kwargs): def run_once(self, *args, **kwargs):
""" """
@ -166,7 +187,6 @@ class ObjectExpirer(Daemon):
""" """
self.get_process_values(kwargs) self.get_process_values(kwargs)
pool = GreenPool(self.concurrency) pool = GreenPool(self.concurrency)
containers_to_delete = set([])
self.report_first_time = self.report_last_time = time() self.report_first_time = self.report_last_time = time()
self.report_objects = 0 self.report_objects = 0
try: try:
@ -178,22 +198,19 @@ class ObjectExpirer(Daemon):
'%(objects)s possible objects') % { '%(objects)s possible objects') % {
'containers': containers, 'objects': objects}) 'containers': containers, 'objects': objects})
for container, obj in self.iter_cont_objs_to_expire(): task_containers = list(self.iter_task_containers_to_expire())
containers_to_delete.add(container)
if not obj: # delete_task_iter is a generator to yield a dict of
continue # task_container, task_object, delete_timestamp, target_path
# to handle delete actual object and pop the task from the queue.
delete_task_iter = self.round_robin_order(
self.iter_task_to_expire(task_containers))
timestamp, actual_obj = obj.split('-', 1) for delete_task in delete_task_iter:
timestamp = int(timestamp) pool.spawn_n(self.delete_object, **delete_task)
if timestamp > int(time()):
break
pool.spawn_n(
self.delete_object, actual_obj, timestamp,
container, obj)
pool.waitall() pool.waitall()
for container in containers_to_delete: for container in task_containers:
try: try:
self.swift.delete_container( self.swift.delete_container(
self.expiring_objects_account, self.expiring_objects_account,
@ -257,33 +274,35 @@ class ObjectExpirer(Daemon):
raise ValueError( raise ValueError(
'process must be less than processes') 'process must be less than processes')
def delete_object(self, actual_obj, timestamp, container, obj): def delete_object(self, target_path, delete_timestamp,
task_container, task_object):
start_time = time() start_time = time()
try: try:
try: try:
self.delete_actual_object(actual_obj, timestamp) self.delete_actual_object(target_path, delete_timestamp)
except UnexpectedResponse as err: except UnexpectedResponse as err:
if err.resp.status_int not in {HTTP_NOT_FOUND, if err.resp.status_int not in {HTTP_NOT_FOUND,
HTTP_PRECONDITION_FAILED}: HTTP_PRECONDITION_FAILED}:
raise raise
if float(timestamp) > time() - self.reclaim_age: if float(delete_timestamp) > time() - self.reclaim_age:
# we'll have to retry the DELETE later # we'll have to retry the DELETE later
raise raise
self.pop_queue(container, obj) self.pop_queue(task_container, task_object)
self.report_objects += 1 self.report_objects += 1
self.logger.increment('objects') self.logger.increment('objects')
except UnexpectedResponse as err: except UnexpectedResponse as err:
self.logger.increment('errors') self.logger.increment('errors')
self.logger.error( self.logger.error(
'Unexpected response while deleting object %(container)s ' 'Unexpected response while deleting object %(container)s '
'%(obj)s: %(err)s' % {'container': container, 'obj': obj, '%(obj)s: %(err)s' % {
'err': str(err.resp.status_int)}) 'container': task_container, 'obj': task_object,
'err': str(err.resp.status_int)})
except (Exception, Timeout) as err: except (Exception, Timeout) as err:
self.logger.increment('errors') self.logger.increment('errors')
self.logger.exception( self.logger.exception(
'Exception while deleting object %(container)s %(obj)s' 'Exception while deleting object %(container)s %(obj)s'
' %(err)s' % {'container': container, ' %(err)s' % {'container': task_container,
'obj': obj, 'err': str(err)}) 'obj': task_object, 'err': str(err)})
self.logger.timing_since('timing', start_time) self.logger.timing_since('timing', start_time)
self.report() self.report()
@ -304,15 +323,16 @@ class ObjectExpirer(Daemon):
:param actual_obj: The name of the end-user object to delete: :param actual_obj: The name of the end-user object to delete:
'<account>/<container>/<object>' '<account>/<container>/<object>'
:param timestamp: The timestamp the X-Delete-At value must match to :param timestamp: The swift.common.utils.Timestamp instance the
perform the actual delete. X-Delete-At value must match to perform the actual
delete.
:raises UnexpectedResponse: if the delete was unsuccessful and :raises UnexpectedResponse: if the delete was unsuccessful and
should be retried later should be retried later
""" """
path = '/v1/' + urllib.parse.quote(actual_obj.lstrip('/')) path = '/v1/' + urllib.parse.quote(actual_obj.lstrip('/'))
self.swift.make_request( self.swift.make_request(
'DELETE', path, 'DELETE', path,
{'X-If-Delete-At': str(timestamp), {'X-If-Delete-At': timestamp.normal,
'X-Timestamp': str(timestamp), 'X-Timestamp': timestamp.normal,
'X-Backend-Clean-Expiring-Object-Queue': 'no'}, 'X-Backend-Clean-Expiring-Object-Queue': 'no'},
(2, HTTP_CONFLICT)) (2, HTTP_CONFLICT))

View File

@ -26,6 +26,7 @@ import six
from six.moves import urllib from six.moves import urllib
from swift.common import internal_client, utils, swob from swift.common import internal_client, utils, swob
from swift.common.utils import Timestamp
from swift.obj import expirer from swift.obj import expirer
@ -215,11 +216,12 @@ class TestObjectExpirer(TestCase):
self.deleted_objects = {} self.deleted_objects = {}
self.obj_containers_in_order = [] self.obj_containers_in_order = []
def delete_object(self, actual_obj, timestamp, container, obj): def delete_object(self, target_path, delete_timestamp,
if container not in self.deleted_objects: task_container, task_object):
self.deleted_objects[container] = set() if task_container not in self.deleted_objects:
self.deleted_objects[container].add(obj) self.deleted_objects[task_container] = set()
self.obj_containers_in_order.append(container) self.deleted_objects[task_container].add(task_object)
self.obj_containers_in_order.append(task_container)
aco_dict = { aco_dict = {
'.expiring_objects': { '.expiring_objects': {
@ -321,6 +323,45 @@ class TestObjectExpirer(TestCase):
self.assertTrue( self.assertTrue(
'so far' in str(x.logger.get_lines_for_level('info'))) 'so far' in str(x.logger.get_lines_for_level('info')))
def test_round_robin_order(self):
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
task_con_obj_list = [
# objects in 0000 timestamp container
{'task_container': '0000', 'task_object': '0000-a/c0/o0',
'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o0'},
{'task_container': '0000', 'task_object': '0000-a/c0/o1',
'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o1'},
# objects in 0001 timestamp container
{'task_container': '0001', 'task_object': '0001-a/c1/o0',
'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o0'},
{'task_container': '0001', 'task_object': '0001-a/c1/o1',
'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o1'},
# objects in 0002 timestamp container
{'task_container': '0002', 'task_object': '0002-a/c2/o0',
'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o0'},
{'task_container': '0002', 'task_object': '0002-a/c2/o1',
'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o1'},
]
result = list(x.round_robin_order(task_con_obj_list))
# sorted by poping one object to delete for each target_container
expected = [
# objects in 0000 timestamp container
{'task_container': '0000', 'task_object': '0000-a/c0/o0',
'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o0'},
{'task_container': '0001', 'task_object': '0001-a/c1/o0',
'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o0'},
{'task_container': '0002', 'task_object': '0002-a/c2/o0',
'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o0'},
{'task_container': '0000', 'task_object': '0000-a/c0/o1',
'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o1'},
{'task_container': '0001', 'task_object': '0001-a/c1/o1',
'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o1'},
{'task_container': '0002', 'task_object': '0002-a/c2/o1',
'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o1'},
]
self.assertEqual(expected, result)
def test_run_once_nothing_to_do(self): def test_run_once_nothing_to_do(self):
x = expirer.ObjectExpirer(self.conf, logger=self.logger) x = expirer.ObjectExpirer(self.conf, logger=self.logger)
x.swift = 'throw error because a string does not have needed methods' x.swift = 'throw error because a string does not have needed methods'
@ -621,7 +662,7 @@ class TestObjectExpirer(TestCase):
internal_client.loadapp = lambda *a, **kw: fake_app internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({}) x = expirer.ObjectExpirer({})
ts = '1234' ts = Timestamp('1234')
x.delete_actual_object('/path/to/object', ts) x.delete_actual_object('/path/to/object', ts)
self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'], self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'],
@ -640,7 +681,7 @@ class TestObjectExpirer(TestCase):
internal_client.loadapp = lambda *a, **kw: fake_app internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({}) x = expirer.ObjectExpirer({})
ts = '1234' ts = Timestamp('1234')
x.delete_actual_object('/path/to/object name', ts) x.delete_actual_object('/path/to/object name', ts)
self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts) self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'], self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'],
@ -659,11 +700,12 @@ class TestObjectExpirer(TestCase):
internal_client.loadapp = lambda *a, **kw: fake_app internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({}) x = expirer.ObjectExpirer({})
ts = Timestamp('1234')
if should_raise: if should_raise:
with self.assertRaises(internal_client.UnexpectedResponse): with self.assertRaises(internal_client.UnexpectedResponse):
x.delete_actual_object('/path/to/object', '1234') x.delete_actual_object('/path/to/object', ts)
else: else:
x.delete_actual_object('/path/to/object', '1234') x.delete_actual_object('/path/to/object', ts)
self.assertEqual(calls[0], 1) self.assertEqual(calls[0], 1)
# object was deleted and tombstone reaped # object was deleted and tombstone reaped
@ -688,7 +730,7 @@ class TestObjectExpirer(TestCase):
x = expirer.ObjectExpirer({}) x = expirer.ObjectExpirer({})
exc = None exc = None
try: try:
x.delete_actual_object('/path/to/object', '1234') x.delete_actual_object('/path/to/object', Timestamp('1234'))
except Exception as err: except Exception as err:
exc = err exc = err
finally: finally:
@ -697,7 +739,7 @@ class TestObjectExpirer(TestCase):
def test_delete_actual_object_quotes(self): def test_delete_actual_object_quotes(self):
name = 'this name should get quoted' name = 'this name should get quoted'
timestamp = '1366063156.863045' timestamp = Timestamp('1366063156.863045')
x = expirer.ObjectExpirer({}) x = expirer.ObjectExpirer({})
x.swift.make_request = mock.Mock() x.swift.make_request = mock.Mock()
x.swift.make_request.return_value.status_int = 204 x.swift.make_request.return_value.status_int = 204
@ -708,7 +750,7 @@ class TestObjectExpirer(TestCase):
def test_delete_actual_object_queue_cleaning(self): def test_delete_actual_object_queue_cleaning(self):
name = 'something' name = 'something'
timestamp = '1515544858.80602' timestamp = Timestamp('1515544858.80602')
x = expirer.ObjectExpirer({}) x = expirer.ObjectExpirer({})
x.swift.make_request = mock.MagicMock() x.swift.make_request = mock.MagicMock()
x.delete_actual_object(name, timestamp) x.delete_actual_object(name, timestamp)