Merge "Delete expired objects in slightly smarter way."
This commit is contained in:
commit
e7dfe58298
@ -25,12 +25,14 @@ from eventlet.greenpool import GreenPool
|
||||
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.internal_client import InternalClient, UnexpectedResponse
|
||||
from swift.common.utils import get_logger, dump_recon_cache
|
||||
from swift.common.utils import get_logger, dump_recon_cache, split_path
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
|
||||
HTTP_PRECONDITION_FAILED
|
||||
|
||||
from swift.container.reconciler import direct_delete_container_entry
|
||||
|
||||
MAX_OBJECTS_TO_CACHE = 100000
|
||||
|
||||
|
||||
class ObjectExpirer(Daemon):
|
||||
"""
|
||||
@ -85,6 +87,66 @@ class ObjectExpirer(Daemon):
|
||||
(elapsed, self.report_objects))
|
||||
self.report_last_time = time()
|
||||
|
||||
def iter_cont_objs_to_expire(self):
|
||||
"""
|
||||
Yields (container, obj) tuples to be deleted
|
||||
"""
|
||||
obj_cache = {}
|
||||
cnt = 0
|
||||
|
||||
all_containers = set()
|
||||
|
||||
for c in self.swift.iter_containers(self.expiring_objects_account):
|
||||
container = c['name']
|
||||
timestamp = int(container)
|
||||
if timestamp > int(time()):
|
||||
break
|
||||
all_containers.add(container)
|
||||
for o in self.swift.iter_objects(self.expiring_objects_account,
|
||||
container):
|
||||
obj = o['name'].encode('utf8')
|
||||
timestamp, actual_obj = obj.split('-', 1)
|
||||
timestamp = int(timestamp)
|
||||
if timestamp > int(time()):
|
||||
break
|
||||
try:
|
||||
cust_account, cust_cont, cust_obj = \
|
||||
split_path('/' + actual_obj, 3, 3, True)
|
||||
cache_key = '%s/%s' % (cust_account, cust_cont)
|
||||
except ValueError:
|
||||
cache_key = None
|
||||
|
||||
if self.processes > 0:
|
||||
obj_process = int(
|
||||
hashlib.md5('%s/%s' % (str(container), obj)).
|
||||
hexdigest(), 16)
|
||||
if obj_process % self.processes != self.process:
|
||||
continue
|
||||
|
||||
if cache_key not in obj_cache:
|
||||
obj_cache[cache_key] = []
|
||||
obj_cache[cache_key].append((container, obj))
|
||||
cnt += 1
|
||||
|
||||
if cnt > MAX_OBJECTS_TO_CACHE:
|
||||
while obj_cache:
|
||||
for key in obj_cache.keys():
|
||||
if obj_cache[key]:
|
||||
yield obj_cache[key].pop()
|
||||
cnt -= 1
|
||||
else:
|
||||
del obj_cache[key]
|
||||
|
||||
while obj_cache:
|
||||
for key in obj_cache.keys():
|
||||
if obj_cache[key]:
|
||||
yield obj_cache[key].pop()
|
||||
else:
|
||||
del obj_cache[key]
|
||||
|
||||
for container in all_containers:
|
||||
yield (container, None)
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""
|
||||
Executes a single pass, looking for objects to expire.
|
||||
@ -96,9 +158,9 @@ class ObjectExpirer(Daemon):
|
||||
These will override the values from the config file if
|
||||
provided.
|
||||
"""
|
||||
processes, process = self.get_process_values(kwargs)
|
||||
self.get_process_values(kwargs)
|
||||
pool = GreenPool(self.concurrency)
|
||||
containers_to_delete = []
|
||||
containers_to_delete = set([])
|
||||
self.report_first_time = self.report_last_time = time()
|
||||
self.report_objects = 0
|
||||
try:
|
||||
@ -107,28 +169,21 @@ class ObjectExpirer(Daemon):
|
||||
self.swift.get_account_info(self.expiring_objects_account)
|
||||
self.logger.info(_('Pass beginning; %s possible containers; %s '
|
||||
'possible objects') % (containers, objects))
|
||||
for c in self.swift.iter_containers(self.expiring_objects_account):
|
||||
container = c['name']
|
||||
timestamp = int(container)
|
||||
|
||||
for container, obj in self.iter_cont_objs_to_expire():
|
||||
containers_to_delete.add(container)
|
||||
|
||||
if not obj:
|
||||
continue
|
||||
|
||||
timestamp, actual_obj = obj.split('-', 1)
|
||||
timestamp = int(timestamp)
|
||||
if timestamp > int(time()):
|
||||
break
|
||||
containers_to_delete.append(container)
|
||||
for o in self.swift.iter_objects(self.expiring_objects_account,
|
||||
container):
|
||||
obj = o['name'].encode('utf8')
|
||||
if processes > 0:
|
||||
obj_process = int(
|
||||
hashlib.md5('%s/%s' % (str(container), obj)).
|
||||
hexdigest(), 16)
|
||||
if obj_process % processes != process:
|
||||
continue
|
||||
timestamp, actual_obj = obj.split('-', 1)
|
||||
timestamp = int(timestamp)
|
||||
if timestamp > int(time()):
|
||||
break
|
||||
pool.spawn_n(
|
||||
self.delete_object, actual_obj, timestamp,
|
||||
container, obj)
|
||||
pool.spawn_n(
|
||||
self.delete_object, actual_obj, timestamp,
|
||||
container, obj)
|
||||
|
||||
pool.waitall()
|
||||
for container in containers_to_delete:
|
||||
try:
|
||||
@ -167,38 +222,32 @@ class ObjectExpirer(Daemon):
|
||||
|
||||
def get_process_values(self, kwargs):
|
||||
"""
|
||||
Gets the processes, process from the kwargs if those values exist.
|
||||
|
||||
Otherwise, return processes, process set in the config file.
|
||||
Sets self.processes and self.process from the kwargs if those
|
||||
values exist, otherwise, leaves those values as they were set in
|
||||
the config file.
|
||||
|
||||
:param kwargs: Keyword args passed into the run_forever(), run_once()
|
||||
methods. They have values specified on the command
|
||||
line when the daemon is run.
|
||||
"""
|
||||
if kwargs.get('processes') is not None:
|
||||
processes = int(kwargs['processes'])
|
||||
else:
|
||||
processes = self.processes
|
||||
self.processes = int(kwargs['processes'])
|
||||
|
||||
if kwargs.get('process') is not None:
|
||||
process = int(kwargs['process'])
|
||||
else:
|
||||
process = self.process
|
||||
self.process = int(kwargs['process'])
|
||||
|
||||
if process < 0:
|
||||
if self.process < 0:
|
||||
raise ValueError(
|
||||
'process must be an integer greater than or equal to 0')
|
||||
|
||||
if processes < 0:
|
||||
if self.processes < 0:
|
||||
raise ValueError(
|
||||
'processes must be an integer greater than or equal to 0')
|
||||
|
||||
if processes and process >= processes:
|
||||
if self.processes and self.process >= self.processes:
|
||||
raise ValueError(
|
||||
'process must be less than or equal to processes')
|
||||
|
||||
return processes, process
|
||||
|
||||
def delete_object(self, actual_obj, timestamp, container, obj):
|
||||
start_time = time()
|
||||
try:
|
||||
|
@ -65,7 +65,9 @@ class TestObjectExpirer(TestCase):
|
||||
'processes': 5,
|
||||
'process': 1,
|
||||
}
|
||||
self.assertEqual((5, 1), x.get_process_values(vals))
|
||||
x.get_process_values(vals)
|
||||
self.assertEqual(x.processes, 5)
|
||||
self.assertEqual(x.process, 1)
|
||||
|
||||
def test_get_process_values_from_config(self):
|
||||
vals = {
|
||||
@ -73,7 +75,9 @@ class TestObjectExpirer(TestCase):
|
||||
'process': 1,
|
||||
}
|
||||
x = expirer.ObjectExpirer(vals)
|
||||
self.assertEqual((5, 1), x.get_process_values({}))
|
||||
x.get_process_values({})
|
||||
self.assertEqual(x.processes, 5)
|
||||
self.assertEqual(x.process, 1)
|
||||
|
||||
def test_get_process_values_negative_process(self):
|
||||
vals = {
|
||||
@ -129,11 +133,13 @@ class TestObjectExpirer(TestCase):
|
||||
super(ObjectExpirer, self).__init__(conf)
|
||||
self.processes = 3
|
||||
self.deleted_objects = {}
|
||||
self.obj_containers_in_order = []
|
||||
|
||||
def delete_object(self, actual_obj, timestamp, container, obj):
|
||||
if container not in self.deleted_objects:
|
||||
self.deleted_objects[container] = set()
|
||||
self.deleted_objects[container].add(obj)
|
||||
self.obj_containers_in_order.append(container)
|
||||
|
||||
class InternalClient(object):
|
||||
|
||||
@ -172,6 +178,7 @@ class TestObjectExpirer(TestCase):
|
||||
self.assertEqual(containers[ukey].pop(),
|
||||
deleted_objects[ukey].pop().decode('utf8'))
|
||||
self.assertEqual(containers, deleted_objects)
|
||||
self.assertEqual(len(set(x.obj_containers_in_order[:4])), 4)
|
||||
|
||||
def test_delete_object(self):
|
||||
class InternalClient(object):
|
||||
@ -451,18 +458,19 @@ class TestObjectExpirer(TestCase):
|
||||
|
||||
fake_swift = InternalClient(
|
||||
[{'name': str(int(time() - 86400))}],
|
||||
[{'name': '%d-actual-obj' % int(time() - 86400)}])
|
||||
[{'name': '%d-acc/c/actual-obj' % int(time() - 86400)}])
|
||||
x = expirer.ObjectExpirer({}, logger=self.logger, swift=fake_swift)
|
||||
x.delete_actual_object = lambda o, t: None
|
||||
x.pop_queue = lambda c, o: None
|
||||
self.assertEqual(x.report_objects, 0)
|
||||
x.run_once()
|
||||
self.assertEqual(x.report_objects, 1)
|
||||
self.assertEqual(
|
||||
x.logger.log_dict['info'],
|
||||
[(('Pass beginning; 1 possible containers; '
|
||||
'2 possible objects',), {}),
|
||||
(('Pass completed in 0s; 1 objects expired',), {})])
|
||||
with mock.patch('swift.obj.expirer.MAX_OBJECTS_TO_CACHE', 0):
|
||||
x.run_once()
|
||||
self.assertEqual(x.report_objects, 1)
|
||||
self.assertEqual(
|
||||
x.logger.log_dict['info'],
|
||||
[(('Pass beginning; 1 possible containers; '
|
||||
'2 possible objects',), {}),
|
||||
(('Pass completed in 0s; 1 objects expired',), {})])
|
||||
|
||||
def test_delete_actual_object_does_not_get_unicode(self):
|
||||
class InternalClient(object):
|
||||
|
Loading…
x
Reference in New Issue
Block a user