Delete expired objects in slightly smarter way.

When the expirer tries to delete customer objects, if it just walks through the
containers in order the deamon will tend to send DELETEs to the same container
highly concurrently. This will in turn create a lot of asyncs because of all
the concurrent deletes. If the deletes were spread out to multiple containers
it would improve performance and decrease the number of asyncs made.

Change-Id: I3d08118c197b7f18dd7e880bd5664508934ffd24
This commit is contained in:
David Goetz 2014-08-15 15:54:05 -07:00 committed by David Goetz
parent 7f890f8e40
commit eff9ab74a4
2 changed files with 104 additions and 47 deletions

View File

@ -25,12 +25,14 @@ from eventlet.greenpool import GreenPool
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
from swift.common.internal_client import InternalClient, UnexpectedResponse from swift.common.internal_client import InternalClient, UnexpectedResponse
from swift.common.utils import get_logger, dump_recon_cache from swift.common.utils import get_logger, dump_recon_cache, split_path
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \ from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
HTTP_PRECONDITION_FAILED HTTP_PRECONDITION_FAILED
from swift.container.reconciler import direct_delete_container_entry from swift.container.reconciler import direct_delete_container_entry
MAX_OBJECTS_TO_CACHE = 100000
class ObjectExpirer(Daemon): class ObjectExpirer(Daemon):
""" """
@ -85,6 +87,66 @@ class ObjectExpirer(Daemon):
(elapsed, self.report_objects)) (elapsed, self.report_objects))
self.report_last_time = time() self.report_last_time = time()
def iter_cont_objs_to_expire(self):
"""
Yields (container, obj) tuples to be deleted
"""
obj_cache = {}
cnt = 0
all_containers = set()
for c in self.swift.iter_containers(self.expiring_objects_account):
container = c['name']
timestamp = int(container)
if timestamp > int(time()):
break
all_containers.add(container)
for o in self.swift.iter_objects(self.expiring_objects_account,
container):
obj = o['name'].encode('utf8')
timestamp, actual_obj = obj.split('-', 1)
timestamp = int(timestamp)
if timestamp > int(time()):
break
try:
cust_account, cust_cont, cust_obj = \
split_path('/' + actual_obj, 3, 3, True)
cache_key = '%s/%s' % (cust_account, cust_cont)
except ValueError:
cache_key = None
if self.processes > 0:
obj_process = int(
hashlib.md5('%s/%s' % (str(container), obj)).
hexdigest(), 16)
if obj_process % self.processes != self.process:
continue
if cache_key not in obj_cache:
obj_cache[cache_key] = []
obj_cache[cache_key].append((container, obj))
cnt += 1
if cnt > MAX_OBJECTS_TO_CACHE:
while obj_cache:
for key in obj_cache.keys():
if obj_cache[key]:
yield obj_cache[key].pop()
cnt -= 1
else:
del obj_cache[key]
while obj_cache:
for key in obj_cache.keys():
if obj_cache[key]:
yield obj_cache[key].pop()
else:
del obj_cache[key]
for container in all_containers:
yield (container, None)
def run_once(self, *args, **kwargs): def run_once(self, *args, **kwargs):
""" """
Executes a single pass, looking for objects to expire. Executes a single pass, looking for objects to expire.
@ -96,9 +158,9 @@ class ObjectExpirer(Daemon):
These will override the values from the config file if These will override the values from the config file if
provided. provided.
""" """
processes, process = self.get_process_values(kwargs) self.get_process_values(kwargs)
pool = GreenPool(self.concurrency) pool = GreenPool(self.concurrency)
containers_to_delete = [] containers_to_delete = set([])
self.report_first_time = self.report_last_time = time() self.report_first_time = self.report_last_time = time()
self.report_objects = 0 self.report_objects = 0
try: try:
@ -107,28 +169,21 @@ class ObjectExpirer(Daemon):
self.swift.get_account_info(self.expiring_objects_account) self.swift.get_account_info(self.expiring_objects_account)
self.logger.info(_('Pass beginning; %s possible containers; %s ' self.logger.info(_('Pass beginning; %s possible containers; %s '
'possible objects') % (containers, objects)) 'possible objects') % (containers, objects))
for c in self.swift.iter_containers(self.expiring_objects_account):
container = c['name'] for container, obj in self.iter_cont_objs_to_expire():
timestamp = int(container) containers_to_delete.add(container)
if not obj:
continue
timestamp, actual_obj = obj.split('-', 1)
timestamp = int(timestamp)
if timestamp > int(time()): if timestamp > int(time()):
break break
containers_to_delete.append(container) pool.spawn_n(
for o in self.swift.iter_objects(self.expiring_objects_account, self.delete_object, actual_obj, timestamp,
container): container, obj)
obj = o['name'].encode('utf8')
if processes > 0:
obj_process = int(
hashlib.md5('%s/%s' % (str(container), obj)).
hexdigest(), 16)
if obj_process % processes != process:
continue
timestamp, actual_obj = obj.split('-', 1)
timestamp = int(timestamp)
if timestamp > int(time()):
break
pool.spawn_n(
self.delete_object, actual_obj, timestamp,
container, obj)
pool.waitall() pool.waitall()
for container in containers_to_delete: for container in containers_to_delete:
try: try:
@ -167,38 +222,32 @@ class ObjectExpirer(Daemon):
def get_process_values(self, kwargs): def get_process_values(self, kwargs):
""" """
Gets the processes, process from the kwargs if those values exist. Sets self.processes and self.process from the kwargs if those
values exist, otherwise, leaves those values as they were set in
Otherwise, return processes, process set in the config file. the config file.
:param kwargs: Keyword args passed into the run_forever(), run_once() :param kwargs: Keyword args passed into the run_forever(), run_once()
methods. They have values specified on the command methods. They have values specified on the command
line when the daemon is run. line when the daemon is run.
""" """
if kwargs.get('processes') is not None: if kwargs.get('processes') is not None:
processes = int(kwargs['processes']) self.processes = int(kwargs['processes'])
else:
processes = self.processes
if kwargs.get('process') is not None: if kwargs.get('process') is not None:
process = int(kwargs['process']) self.process = int(kwargs['process'])
else:
process = self.process
if process < 0: if self.process < 0:
raise ValueError( raise ValueError(
'process must be an integer greater than or equal to 0') 'process must be an integer greater than or equal to 0')
if processes < 0: if self.processes < 0:
raise ValueError( raise ValueError(
'processes must be an integer greater than or equal to 0') 'processes must be an integer greater than or equal to 0')
if processes and process >= processes: if self.processes and self.process >= self.processes:
raise ValueError( raise ValueError(
'process must be less than or equal to processes') 'process must be less than or equal to processes')
return processes, process
def delete_object(self, actual_obj, timestamp, container, obj): def delete_object(self, actual_obj, timestamp, container, obj):
start_time = time() start_time = time()
try: try:

View File

@ -65,7 +65,9 @@ class TestObjectExpirer(TestCase):
'processes': 5, 'processes': 5,
'process': 1, 'process': 1,
} }
self.assertEqual((5, 1), x.get_process_values(vals)) x.get_process_values(vals)
self.assertEqual(x.processes, 5)
self.assertEqual(x.process, 1)
def test_get_process_values_from_config(self): def test_get_process_values_from_config(self):
vals = { vals = {
@ -73,7 +75,9 @@ class TestObjectExpirer(TestCase):
'process': 1, 'process': 1,
} }
x = expirer.ObjectExpirer(vals) x = expirer.ObjectExpirer(vals)
self.assertEqual((5, 1), x.get_process_values({})) x.get_process_values({})
self.assertEqual(x.processes, 5)
self.assertEqual(x.process, 1)
def test_get_process_values_negative_process(self): def test_get_process_values_negative_process(self):
vals = { vals = {
@ -129,11 +133,13 @@ class TestObjectExpirer(TestCase):
super(ObjectExpirer, self).__init__(conf) super(ObjectExpirer, self).__init__(conf)
self.processes = 3 self.processes = 3
self.deleted_objects = {} self.deleted_objects = {}
self.obj_containers_in_order = []
def delete_object(self, actual_obj, timestamp, container, obj): def delete_object(self, actual_obj, timestamp, container, obj):
if container not in self.deleted_objects: if container not in self.deleted_objects:
self.deleted_objects[container] = set() self.deleted_objects[container] = set()
self.deleted_objects[container].add(obj) self.deleted_objects[container].add(obj)
self.obj_containers_in_order.append(container)
class InternalClient(object): class InternalClient(object):
@ -172,6 +178,7 @@ class TestObjectExpirer(TestCase):
self.assertEqual(containers[ukey].pop(), self.assertEqual(containers[ukey].pop(),
deleted_objects[ukey].pop().decode('utf8')) deleted_objects[ukey].pop().decode('utf8'))
self.assertEqual(containers, deleted_objects) self.assertEqual(containers, deleted_objects)
self.assertEqual(len(set(x.obj_containers_in_order[:4])), 4)
def test_delete_object(self): def test_delete_object(self):
class InternalClient(object): class InternalClient(object):
@ -451,18 +458,19 @@ class TestObjectExpirer(TestCase):
fake_swift = InternalClient( fake_swift = InternalClient(
[{'name': str(int(time() - 86400))}], [{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % int(time() - 86400)}]) [{'name': '%d-acc/c/actual-obj' % int(time() - 86400)}])
x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift) x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift)
x.delete_actual_object = lambda o, t: None x.delete_actual_object = lambda o, t: None
x.pop_queue = lambda c, o: None x.pop_queue = lambda c, o: None
self.assertEqual(x.report_objects, 0) self.assertEqual(x.report_objects, 0)
x.run_once() with mock.patch('swift.obj.expirer.MAX_OBJECTS_TO_CACHE', 0):
self.assertEqual(x.report_objects, 1) x.run_once()
self.assertEqual( self.assertEqual(x.report_objects, 1)
x.logger.log_dict['info'], self.assertEqual(
[(('Pass beginning; 1 possible containers; ' x.logger.log_dict['info'],
'2 possible objects',), {}), [(('Pass beginning; 1 possible containers; '
(('Pass completed in 0s; 1 objects expired',), {})]) '2 possible objects',), {}),
(('Pass completed in 0s; 1 objects expired',), {})])
def test_delete_actual_object_does_not_get_unicode(self): def test_delete_actual_object_does_not_get_unicode(self):
class InternalClient(object): class InternalClient(object):