Split expirer methods and parametrize task account
To prepare for implement general task queue mode to expirer, this patch splits expirer's method into smaller ones and parametrize task account. This change will make expirer's general task queue patch [1] more simple. This patch has following approaches: 1: Split methods into smaller ones 2: Parameterize task account name to adapt many task accounts in general task queue 3: Include task account names in log messages 4: Skip task account when the account has no task containers [1]: https://review.openstack.org/#/c/517389/ Change-Id: I907612f7c258495e9ccc53c1d57de4791b3e7ab7
This commit is contained in:
parent
f1f8591c6a
commit
1fadffeae0
@ -39,8 +39,8 @@ MAX_OBJECTS_TO_CACHE = 100000
|
||||
|
||||
class ObjectExpirer(Daemon):
|
||||
"""
|
||||
Daemon that queries the internal hidden expiring_objects_account to
|
||||
discover objects that need to be deleted.
|
||||
Daemon that queries the internal hidden task accounts to discover objects
|
||||
that need to be deleted.
|
||||
|
||||
:param conf: The daemon configuration.
|
||||
"""
|
||||
@ -49,13 +49,9 @@ class ObjectExpirer(Daemon):
|
||||
self.conf = conf
|
||||
self.logger = logger or get_logger(conf, log_route='object-expirer')
|
||||
self.interval = int(conf.get('interval') or 300)
|
||||
self.expiring_objects_account = \
|
||||
(conf.get('auto_create_account_prefix') or '.') + \
|
||||
(conf.get('expiring_objects_account_name') or 'expiring_objects')
|
||||
conf_path = conf.get('__file__') or '/etc/swift/object-expirer.conf'
|
||||
request_tries = int(conf.get('request_tries') or 3)
|
||||
self.swift = swift or InternalClient(
|
||||
conf_path, 'Swift Object Expirer', request_tries)
|
||||
|
||||
self.read_conf_for_queue_access(swift)
|
||||
|
||||
self.report_interval = int(conf.get('report_interval') or 300)
|
||||
self.report_first_time = self.report_last_time = time()
|
||||
self.report_objects = 0
|
||||
@ -65,13 +61,29 @@ class ObjectExpirer(Daemon):
|
||||
self.concurrency = int(conf.get('concurrency', 1))
|
||||
if self.concurrency < 1:
|
||||
raise ValueError("concurrency must be set to at least 1")
|
||||
self.processes = int(self.conf.get('processes', 0))
|
||||
self.process = int(self.conf.get('process', 0))
|
||||
# This option defines how long an un-processable expired object
|
||||
# marker will be retried before it is abandoned. It is not coupled
|
||||
# with the tombstone reclaim age in the consistency engine.
|
||||
self.reclaim_age = int(conf.get('reclaim_age', 604800))
|
||||
|
||||
def read_conf_for_queue_access(self, swift):
|
||||
self.expiring_objects_account = \
|
||||
(self.conf.get('auto_create_account_prefix') or '.') + \
|
||||
(self.conf.get('expiring_objects_account_name') or
|
||||
'expiring_objects')
|
||||
|
||||
# This is for common parameter with general task queue in future
|
||||
self.task_container_prefix = ''
|
||||
|
||||
self.ic_conf_path = \
|
||||
self.conf.get('__file__') or '/etc/swift/object-expirer.conf'
|
||||
request_tries = int(self.conf.get('request_tries') or 3)
|
||||
self.swift = swift or InternalClient(
|
||||
self.ic_conf_path, 'Swift Object Expirer', request_tries)
|
||||
|
||||
self.processes = int(self.conf.get('processes', 0))
|
||||
self.process = int(self.conf.get('process', 0))
|
||||
|
||||
def report(self, final=False):
|
||||
"""
|
||||
Emits a log line report of the progress so far, or the final progress
|
||||
@ -95,6 +107,20 @@ class ObjectExpirer(Daemon):
|
||||
'time': elapsed, 'objects': self.report_objects})
|
||||
self.report_last_time = time()
|
||||
|
||||
def parse_task_obj(self, task_obj):
|
||||
"""
|
||||
:param task_obj: a task object name in format of
|
||||
"<timestamp>-<target_account>/<target_container>" +
|
||||
"/<target_obj>"
|
||||
:return: 4-tuples of (delete_at_time, target_account, target_container,
|
||||
target_obj)
|
||||
"""
|
||||
timestamp, target_path = task_obj.split('-', 1)
|
||||
timestamp = Timestamp(timestamp)
|
||||
target_account, target_container, target_obj = \
|
||||
split_path('/' + target_path, 3, 3, True)
|
||||
return timestamp, target_account, target_container, target_obj
|
||||
|
||||
def round_robin_order(self, task_iter):
|
||||
"""
|
||||
Change order of expiration tasks to avoid deleting objects in a
|
||||
@ -119,6 +145,7 @@ class ObjectExpirer(Daemon):
|
||||
target_account, target_container, _junk = \
|
||||
split_path('/' + delete_task['target_path'], 3, 3, True)
|
||||
cache_key = '%s/%s' % (target_account, target_container)
|
||||
# sanity
|
||||
except ValueError:
|
||||
self.logger.exception('Unexcepted error handling task %r' %
|
||||
delete_task)
|
||||
@ -135,45 +162,82 @@ class ObjectExpirer(Daemon):
|
||||
for task in dump_obj_cache_in_round_robin():
|
||||
yield task
|
||||
|
||||
def iter_task_containers_to_expire(self):
|
||||
def hash_mod(self, name, divisor):
|
||||
"""
|
||||
Yields container name under the expiring_objects_account if
|
||||
the container name (i.e. timestamp) is past.
|
||||
:param name: a task object name
|
||||
:param divisor: a divisor number
|
||||
:return: an integer to decide which expirer is assigned to the task
|
||||
"""
|
||||
for c in self.swift.iter_containers(self.expiring_objects_account):
|
||||
# md5 is only used for shuffling mod
|
||||
return int(hashlib.md5(name).hexdigest(), 16) % divisor
|
||||
|
||||
def iter_task_accounts_to_expire(self):
|
||||
"""
|
||||
Yields (task_account, my_index, divisor).
|
||||
my_index and divisor is used to assign task obj to only one
|
||||
expirer. In expirer method, expirer calculates assigned index for each
|
||||
expiration task. The assigned index is in [0, 1, ..., divisor - 1].
|
||||
Expirers have their own "my_index" for each task_account. Expirer whose
|
||||
"my_index" is equal to the assigned index executes the task. Because
|
||||
each expirer have different "my_index", task objects are executed by
|
||||
only one expirer.
|
||||
"""
|
||||
if self.processes > 0:
|
||||
yield self.expiring_objects_account, self.process, self.processes
|
||||
else:
|
||||
yield self.expiring_objects_account, 0, 1
|
||||
|
||||
def delete_at_time_of_task_container(self, task_container):
|
||||
"""
|
||||
get delete_at timestamp from task_container name
|
||||
"""
|
||||
# task_container name is timestamp
|
||||
return Timestamp(task_container)
|
||||
|
||||
def iter_task_containers_to_expire(self, task_account):
|
||||
"""
|
||||
Yields task_container names under the task_account if the delete at
|
||||
timestamp of task_container is past.
|
||||
"""
|
||||
for c in self.swift.iter_containers(task_account,
|
||||
prefix=self.task_container_prefix):
|
||||
task_container = str(c['name'])
|
||||
timestamp = Timestamp(task_container)
|
||||
timestamp = self.delete_at_time_of_task_container(task_container)
|
||||
if timestamp > Timestamp.now():
|
||||
break
|
||||
yield task_container
|
||||
|
||||
def iter_task_to_expire(self, task_containers):
|
||||
def iter_task_to_expire(self, task_account_container_list,
|
||||
my_index, divisor):
|
||||
"""
|
||||
Yields task expire info dict which consists of task_container,
|
||||
target_path, timestamp_to_delete, and target_path
|
||||
Yields task expire info dict which consists of task_account,
|
||||
task_container, task_object, timestamp_to_delete, and target_path
|
||||
"""
|
||||
|
||||
for task_container in task_containers:
|
||||
for o in self.swift.iter_objects(self.expiring_objects_account,
|
||||
task_container):
|
||||
for task_account, task_container in task_account_container_list:
|
||||
for o in self.swift.iter_objects(task_account, task_container):
|
||||
task_object = o['name'].encode('utf8')
|
||||
delete_timestamp, target_path = task_object.split('-', 1)
|
||||
delete_timestamp = Timestamp(delete_timestamp)
|
||||
try:
|
||||
delete_timestamp, target_account, target_container, \
|
||||
target_object = self.parse_task_obj(task_object)
|
||||
except ValueError:
|
||||
self.logger.exception('Unexcepted error handling task %r' %
|
||||
task_object)
|
||||
continue
|
||||
if delete_timestamp > Timestamp.now():
|
||||
# we shouldn't yield the object that doesn't reach
|
||||
# the expiration date yet.
|
||||
break
|
||||
|
||||
if self.processes > 0:
|
||||
obj_process = int(
|
||||
hashlib.md5('%s/%s' % (task_container, task_object)).
|
||||
hexdigest(), 16)
|
||||
if obj_process % self.processes != self.process:
|
||||
continue
|
||||
# Only one expirer daemon assigned for one task
|
||||
if self.hash_mod('%s/%s' % (task_container, task_object),
|
||||
divisor) != my_index:
|
||||
continue
|
||||
|
||||
yield {'task_container': task_container,
|
||||
yield {'task_account': task_account,
|
||||
'task_container': task_container,
|
||||
'task_object': task_object,
|
||||
'target_path': target_path,
|
||||
'target_path': '/'.join([
|
||||
target_account, target_container, target_object]),
|
||||
'delete_timestamp': delete_timestamp}
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
@ -193,36 +257,55 @@ class ObjectExpirer(Daemon):
|
||||
self.report_objects = 0
|
||||
try:
|
||||
self.logger.debug('Run begin')
|
||||
containers, objects = \
|
||||
self.swift.get_account_info(self.expiring_objects_account)
|
||||
self.logger.info(_('Pass beginning; '
|
||||
'%(containers)s possible containers; '
|
||||
'%(objects)s possible objects') % {
|
||||
'containers': containers, 'objects': objects})
|
||||
task_account_container_list_to_delete = list()
|
||||
for task_account, my_index, divisor in \
|
||||
self.iter_task_accounts_to_expire():
|
||||
container_count, obj_count = \
|
||||
self.swift.get_account_info(task_account)
|
||||
|
||||
task_containers = list(self.iter_task_containers_to_expire())
|
||||
# the task account is skipped if there are no task container
|
||||
if not container_count:
|
||||
continue
|
||||
|
||||
# delete_task_iter is a generator to yield a dict of
|
||||
# task_container, task_object, delete_timestamp, target_path
|
||||
# to handle delete actual object and pop the task from the queue.
|
||||
delete_task_iter = self.round_robin_order(
|
||||
self.iter_task_to_expire(task_containers))
|
||||
self.logger.info(_(
|
||||
'Pass beginning for task account %(account)s; '
|
||||
'%(container_count)s possible containers; '
|
||||
'%(obj_count)s possible objects') % {
|
||||
'account': task_account,
|
||||
'container_count': container_count,
|
||||
'obj_count': obj_count})
|
||||
|
||||
for delete_task in delete_task_iter:
|
||||
pool.spawn_n(self.delete_object, **delete_task)
|
||||
task_account_container_list = \
|
||||
[(task_account, task_container) for task_container in
|
||||
self.iter_task_containers_to_expire(task_account)]
|
||||
|
||||
task_account_container_list_to_delete.extend(
|
||||
task_account_container_list)
|
||||
|
||||
# delete_task_iter is a generator to yield a dict of
|
||||
# task_account, task_container, task_object, delete_timestamp,
|
||||
# target_path to handle delete actual object and pop the task
|
||||
# from the queue.
|
||||
delete_task_iter = \
|
||||
self.round_robin_order(self.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor))
|
||||
|
||||
for delete_task in delete_task_iter:
|
||||
pool.spawn_n(self.delete_object, **delete_task)
|
||||
|
||||
pool.waitall()
|
||||
for container in task_containers:
|
||||
for task_account, task_container in \
|
||||
task_account_container_list_to_delete:
|
||||
try:
|
||||
self.swift.delete_container(
|
||||
self.expiring_objects_account,
|
||||
container,
|
||||
task_account, task_container,
|
||||
acceptable_statuses=(2, HTTP_NOT_FOUND, HTTP_CONFLICT))
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.exception(
|
||||
_('Exception while deleting container %(container)s '
|
||||
'%(err)s') % {'container': container,
|
||||
'err': str(err)})
|
||||
_('Exception while deleting container %(account)s '
|
||||
'%(container)s %(err)s') % {
|
||||
'account': task_account,
|
||||
'container': task_container, 'err': str(err)})
|
||||
self.logger.debug('Run end')
|
||||
self.report(final=True)
|
||||
except (Exception, Timeout):
|
||||
@ -277,7 +360,7 @@ class ObjectExpirer(Daemon):
|
||||
'process must be less than processes')
|
||||
|
||||
def delete_object(self, target_path, delete_timestamp,
|
||||
task_container, task_object):
|
||||
task_account, task_container, task_object):
|
||||
start_time = time()
|
||||
try:
|
||||
try:
|
||||
@ -289,33 +372,33 @@ class ObjectExpirer(Daemon):
|
||||
if float(delete_timestamp) > time() - self.reclaim_age:
|
||||
# we'll have to retry the DELETE later
|
||||
raise
|
||||
self.pop_queue(task_container, task_object)
|
||||
self.pop_queue(task_account, task_container, task_object)
|
||||
self.report_objects += 1
|
||||
self.logger.increment('objects')
|
||||
except UnexpectedResponse as err:
|
||||
self.logger.increment('errors')
|
||||
self.logger.error(
|
||||
'Unexpected response while deleting object %(container)s '
|
||||
'%(obj)s: %(err)s' % {
|
||||
'container': task_container, 'obj': task_object,
|
||||
'err': str(err.resp.status_int)})
|
||||
'Unexpected response while deleting object '
|
||||
'%(account)s %(container)s %(obj)s: %(err)s' % {
|
||||
'account': task_account, 'container': task_container,
|
||||
'obj': task_object, 'err': str(err.resp.status_int)})
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.increment('errors')
|
||||
self.logger.exception(
|
||||
'Exception while deleting object %(container)s %(obj)s'
|
||||
' %(err)s' % {'container': task_container,
|
||||
'obj': task_object, 'err': str(err)})
|
||||
'Exception while deleting object %(account)s %(container)s '
|
||||
'%(obj)s %(err)s' % {
|
||||
'account': task_account, 'container': task_container,
|
||||
'obj': task_object, 'err': str(err)})
|
||||
self.logger.timing_since('timing', start_time)
|
||||
self.report()
|
||||
|
||||
def pop_queue(self, container, obj):
|
||||
def pop_queue(self, task_account, task_container, task_object):
|
||||
"""
|
||||
Issue a delete object request to the container for the expiring object
|
||||
queue entry.
|
||||
Issue a delete object request to the task_container for the expiring
|
||||
object queue entry.
|
||||
"""
|
||||
direct_delete_container_entry(self.swift.container_ring,
|
||||
self.expiring_objects_account,
|
||||
container, obj)
|
||||
direct_delete_container_entry(self.swift.container_ring, task_account,
|
||||
task_container, task_object)
|
||||
|
||||
def delete_actual_object(self, actual_obj, timestamp):
|
||||
"""
|
||||
|
@ -215,7 +215,7 @@ class TestObjectExpirer(TestCase):
|
||||
self.deleted_objects = {}
|
||||
|
||||
def delete_object(self, target_path, delete_timestamp,
|
||||
task_container, task_object):
|
||||
task_account, task_container, task_object):
|
||||
if task_container not in self.deleted_objects:
|
||||
self.deleted_objects[task_container] = set()
|
||||
self.deleted_objects[task_container].add(task_object)
|
||||
@ -249,6 +249,7 @@ class TestObjectExpirer(TestCase):
|
||||
actual_obj = 'actual_obj'
|
||||
timestamp = int(time())
|
||||
reclaim_ts = timestamp - x.reclaim_age
|
||||
account = 'account'
|
||||
container = 'container'
|
||||
obj = 'obj'
|
||||
|
||||
@ -266,12 +267,12 @@ class TestObjectExpirer(TestCase):
|
||||
with mock.patch.object(x, 'delete_actual_object',
|
||||
side_effect=exc) as delete_actual:
|
||||
with mock.patch.object(x, 'pop_queue') as pop_queue:
|
||||
x.delete_object(actual_obj, ts, container, obj)
|
||||
x.delete_object(actual_obj, ts, account, container, obj)
|
||||
|
||||
delete_actual.assert_called_once_with(actual_obj, ts)
|
||||
log_lines = x.logger.get_lines_for_level('error')
|
||||
if should_pop:
|
||||
pop_queue.assert_called_once_with(container, obj)
|
||||
pop_queue.assert_called_once_with(account, container, obj)
|
||||
self.assertEqual(start_reports + 1, x.report_objects)
|
||||
self.assertFalse(log_lines)
|
||||
else:
|
||||
@ -281,11 +282,12 @@ class TestObjectExpirer(TestCase):
|
||||
if isinstance(exc, internal_client.UnexpectedResponse):
|
||||
self.assertEqual(
|
||||
log_lines[0],
|
||||
'Unexpected response while deleting object container '
|
||||
'obj: %s' % exc.resp.status_int)
|
||||
'Unexpected response while deleting object '
|
||||
'account container obj: %s' % exc.resp.status_int)
|
||||
else:
|
||||
self.assertTrue(log_lines[0].startswith(
|
||||
'Exception while deleting object container obj'))
|
||||
'Exception while deleting object '
|
||||
'account container obj'))
|
||||
|
||||
# verify pop_queue logic on exceptions
|
||||
for exc, ts, should_pop in [(None, timestamp, True),
|
||||
@ -322,9 +324,27 @@ class TestObjectExpirer(TestCase):
|
||||
self.assertTrue(
|
||||
'so far' in str(x.logger.get_lines_for_level('info')))
|
||||
|
||||
def test_parse_task_obj(self):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
|
||||
|
||||
def assert_parse_task_obj(task_obj, expected_delete_at,
|
||||
expected_account, expected_container,
|
||||
expected_obj):
|
||||
delete_at, account, container, obj = x.parse_task_obj(task_obj)
|
||||
self.assertEqual(delete_at, expected_delete_at)
|
||||
self.assertEqual(account, expected_account)
|
||||
self.assertEqual(container, expected_container)
|
||||
self.assertEqual(obj, expected_obj)
|
||||
|
||||
assert_parse_task_obj('0000-a/c/o', 0, 'a', 'c', 'o')
|
||||
assert_parse_task_obj('0001-a/c/o', 1, 'a', 'c', 'o')
|
||||
assert_parse_task_obj('1000-a/c/o', 1000, 'a', 'c', 'o')
|
||||
assert_parse_task_obj('0000-acc/con/obj', 0, 'acc', 'con', 'obj')
|
||||
|
||||
def test_round_robin_order(self):
|
||||
def make_task(delete_at, target):
|
||||
return {
|
||||
'task_account': '.expiring_objects',
|
||||
'task_container': delete_at,
|
||||
'task_object': delete_at + '-' + target,
|
||||
'delete_timestamp': Timestamp(delete_at),
|
||||
@ -439,6 +459,36 @@ class TestObjectExpirer(TestCase):
|
||||
# in which case, we kind of hammer the task containers
|
||||
self.assertEqual(task_con_obj_list, result)
|
||||
|
||||
def test_hash_mod(self):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
|
||||
mod_count = [0, 0, 0]
|
||||
for i in range(1000):
|
||||
name = 'obj%d' % i
|
||||
mod = x.hash_mod(name, 3)
|
||||
mod_count[mod] += 1
|
||||
|
||||
# 1000 names are well shuffled
|
||||
self.assertGreater(mod_count[0], 300)
|
||||
self.assertGreater(mod_count[1], 300)
|
||||
self.assertGreater(mod_count[2], 300)
|
||||
|
||||
def test_iter_task_accounts_to_expire(self):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
|
||||
results = [_ for _ in x.iter_task_accounts_to_expire()]
|
||||
self.assertEqual(results, [('.expiring_objects', 0, 1)])
|
||||
|
||||
self.conf['processes'] = '2'
|
||||
self.conf['process'] = '1'
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
|
||||
results = [_ for _ in x.iter_task_accounts_to_expire()]
|
||||
self.assertEqual(results, [('.expiring_objects', 1, 2)])
|
||||
|
||||
def test_delete_at_time_of_task_container(self):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
|
||||
self.assertEqual(x.delete_at_time_of_task_container('0000'), 0)
|
||||
self.assertEqual(x.delete_at_time_of_task_container('0001'), 1)
|
||||
self.assertEqual(x.delete_at_time_of_task_container('1000'), 1000)
|
||||
|
||||
def test_run_once_nothing_to_do(self):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
|
||||
x.swift = 'throw error because a string does not have needed methods'
|
||||
@ -450,16 +500,97 @@ class TestObjectExpirer(TestCase):
|
||||
"'str' object has no attribute 'get_account_info'")
|
||||
|
||||
def test_run_once_calls_report(self):
|
||||
fake_swift = FakeInternalClient({})
|
||||
fake_swift = FakeInternalClient({
|
||||
'.expiring_objects': {u'1234': [u'1234-a/c/troms\xf8']}
|
||||
})
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=fake_swift)
|
||||
with mock.patch.object(x, 'pop_queue', lambda a, c, o: None):
|
||||
x.run_once()
|
||||
self.assertEqual(
|
||||
x.logger.get_lines_for_level('info'), [
|
||||
'Pass beginning for task account .expiring_objects; '
|
||||
'1 possible containers; 1 possible objects',
|
||||
'Pass completed in 0s; 1 objects expired',
|
||||
])
|
||||
|
||||
def test_skip_task_account_without_task_container(self):
|
||||
fake_swift = FakeInternalClient({
|
||||
# task account has no containers
|
||||
'.expiring_objects': dict()
|
||||
})
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=fake_swift)
|
||||
x.run_once()
|
||||
self.assertEqual(
|
||||
x.logger.get_lines_for_level('info'), [
|
||||
'Pass beginning; 0 possible containers; 0 possible objects',
|
||||
'Pass completed in 0s; 0 objects expired',
|
||||
])
|
||||
|
||||
def test_iter_task_to_expire(self):
|
||||
fake_swift = FakeInternalClient({
|
||||
'.expiring_objects': {
|
||||
u'1234': ['1234-a0/c0/o0', '1234-a1/c1/o1'],
|
||||
u'2000': ['2000-a2/c2/o2', '2000-a3/c3/o3'],
|
||||
}
|
||||
})
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=fake_swift)
|
||||
|
||||
# In this test, all tasks are assigned to the tested expirer
|
||||
my_index = 0
|
||||
divisor = 1
|
||||
|
||||
task_account_container_list = [('.expiring_objects', u'1234'),
|
||||
('.expiring_objects', u'2000')]
|
||||
|
||||
expected = [{
|
||||
'task_account': '.expiring_objects',
|
||||
'task_container': u'1234',
|
||||
'task_object': '1234-a0/c0/o0',
|
||||
'target_path': 'a0/c0/o0',
|
||||
'delete_timestamp': Timestamp(1234),
|
||||
}, {
|
||||
'task_account': '.expiring_objects',
|
||||
'task_container': u'1234',
|
||||
'task_object': '1234-a1/c1/o1',
|
||||
'target_path': 'a1/c1/o1',
|
||||
'delete_timestamp': Timestamp(1234),
|
||||
}, {
|
||||
'task_account': '.expiring_objects',
|
||||
'task_container': u'2000',
|
||||
'task_object': '2000-a2/c2/o2',
|
||||
'target_path': 'a2/c2/o2',
|
||||
'delete_timestamp': Timestamp(2000),
|
||||
}, {
|
||||
'task_account': '.expiring_objects',
|
||||
'task_container': u'2000',
|
||||
'task_object': '2000-a3/c3/o3',
|
||||
'target_path': 'a3/c3/o3',
|
||||
'delete_timestamp': Timestamp(2000),
|
||||
}]
|
||||
|
||||
self.assertEqual(
|
||||
list(x.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
expected)
|
||||
|
||||
# the task queue has invalid task object
|
||||
fake_swift = FakeInternalClient({
|
||||
'.expiring_objects': {
|
||||
u'1234': ['1234-invalid', '1234-a0/c0/o0', '1234-a1/c1/o1'],
|
||||
u'2000': ['2000-a2/c2/o2', '2000-invalid', '2000-a3/c3/o3'],
|
||||
}
|
||||
})
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=fake_swift)
|
||||
|
||||
# but the invalid tasks are skipped
|
||||
self.assertEqual(
|
||||
list(x.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
expected)
|
||||
|
||||
def test_run_once_unicode_problem(self):
|
||||
fake_swift = FakeInternalClient({
|
||||
'.expiring_objects': {u'1234': [u'1234-a/c/troms\xf8']}
|
||||
@ -481,7 +612,7 @@ class TestObjectExpirer(TestCase):
|
||||
raise Exception('This should not have been called')
|
||||
|
||||
fake_swift = FakeInternalClient({
|
||||
'.expiring_objects': {str(int(time() + 86400)): []}
|
||||
'.expiring_objects': {str(int(time() + 86400)): ['1234-a/c/o']}
|
||||
})
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=fake_swift)
|
||||
@ -490,14 +621,15 @@ class TestObjectExpirer(TestCase):
|
||||
x.run_once()
|
||||
logs = x.logger.all_log_lines()
|
||||
self.assertEqual(logs['info'], [
|
||||
'Pass beginning; 1 possible containers; 0 possible objects',
|
||||
'Pass beginning for task account .expiring_objects; '
|
||||
'1 possible containers; 1 possible objects',
|
||||
'Pass completed in 0s; 0 objects expired',
|
||||
])
|
||||
self.assertNotIn('error', logs)
|
||||
|
||||
# Reverse test to be sure it still would blow up the way expected.
|
||||
fake_swift = FakeInternalClient({
|
||||
'.expiring_objects': {str(int(time() - 86400)): []}
|
||||
'.expiring_objects': {str(int(time() - 86400)): ['1234-a/c/o']}
|
||||
})
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=fake_swift)
|
||||
@ -525,7 +657,8 @@ class TestObjectExpirer(TestCase):
|
||||
x.run_once()
|
||||
self.assertNotIn('error', x.logger.all_log_lines())
|
||||
self.assertEqual(x.logger.get_lines_for_level('info'), [
|
||||
'Pass beginning; 1 possible containers; 1 possible objects',
|
||||
'Pass beginning for task account .expiring_objects; '
|
||||
'1 possible containers; 1 possible objects',
|
||||
'Pass completed in 0s; 0 objects expired',
|
||||
])
|
||||
# Reverse test to be sure it still would blow up the way expected.
|
||||
@ -541,14 +674,15 @@ class TestObjectExpirer(TestCase):
|
||||
x.run_once()
|
||||
self.assertEqual(
|
||||
x.logger.get_lines_for_level('error'),
|
||||
['Exception while deleting object %d %d-a/c/actual-obj '
|
||||
'This should not have been called: ' % (ts, ts)])
|
||||
['Exception while deleting object .expiring_objects '
|
||||
'%d %d-a/c/actual-obj This should not have been called: ' %
|
||||
(ts, ts)])
|
||||
|
||||
def test_failed_delete_keeps_entry(self):
|
||||
def deliberately_blow_up(actual_obj, timestamp):
|
||||
raise Exception('failed to delete actual object')
|
||||
|
||||
def should_not_get_called(container, obj):
|
||||
def should_not_get_called(account, container, obj):
|
||||
raise Exception('This should not have been called')
|
||||
|
||||
ts = int(time() - 86400)
|
||||
@ -564,11 +698,13 @@ class TestObjectExpirer(TestCase):
|
||||
x.run_once()
|
||||
self.assertEqual(
|
||||
x.logger.get_lines_for_level('error'),
|
||||
['Exception while deleting object %d %d-a/c/actual-obj '
|
||||
'failed to delete actual object: ' % (ts, ts)])
|
||||
['Exception while deleting object .expiring_objects '
|
||||
'%d %d-a/c/actual-obj failed to delete actual object: ' %
|
||||
(ts, ts)])
|
||||
self.assertEqual(
|
||||
x.logger.get_lines_for_level('info'), [
|
||||
'Pass beginning; 1 possible containers; 1 possible objects',
|
||||
'Pass beginning for task account .expiring_objects; '
|
||||
'1 possible containers; 1 possible objects',
|
||||
'Pass completed in 0s; 0 objects expired',
|
||||
])
|
||||
|
||||
@ -587,8 +723,9 @@ class TestObjectExpirer(TestCase):
|
||||
x.run_once()
|
||||
self.assertEqual(
|
||||
self.logger.get_lines_for_level('error'),
|
||||
['Exception while deleting object %d %d-a/c/actual-obj This '
|
||||
'should not have been called: ' % (ts, ts)])
|
||||
['Exception while deleting object .expiring_objects '
|
||||
'%d %d-a/c/actual-obj This should not have been called: ' %
|
||||
(ts, ts)])
|
||||
|
||||
def test_success_gets_counted(self):
|
||||
fake_swift = FakeInternalClient({
|
||||
@ -600,14 +737,15 @@ class TestObjectExpirer(TestCase):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=fake_swift)
|
||||
x.delete_actual_object = lambda o, t: None
|
||||
x.pop_queue = lambda c, o: None
|
||||
x.pop_queue = lambda a, c, o: None
|
||||
self.assertEqual(x.report_objects, 0)
|
||||
with mock.patch('swift.obj.expirer.MAX_OBJECTS_TO_CACHE', 0):
|
||||
x.run_once()
|
||||
self.assertEqual(x.report_objects, 1)
|
||||
self.assertEqual(
|
||||
x.logger.get_lines_for_level('info'),
|
||||
['Pass beginning; 1 possible containers; 1 possible objects',
|
||||
['Pass beginning for task account .expiring_objects; '
|
||||
'1 possible containers; 1 possible objects',
|
||||
'Pass completed in 0s; 1 objects expired'])
|
||||
|
||||
def test_delete_actual_object_does_not_get_unicode(self):
|
||||
@ -626,13 +764,14 @@ class TestObjectExpirer(TestCase):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=fake_swift)
|
||||
x.delete_actual_object = delete_actual_object_test_for_unicode
|
||||
x.pop_queue = lambda c, o: None
|
||||
x.pop_queue = lambda a, c, o: None
|
||||
self.assertEqual(x.report_objects, 0)
|
||||
x.run_once()
|
||||
self.assertEqual(x.report_objects, 1)
|
||||
self.assertEqual(
|
||||
x.logger.get_lines_for_level('info'), [
|
||||
'Pass beginning; 1 possible containers; 1 possible objects',
|
||||
'Pass beginning for task account .expiring_objects; '
|
||||
'1 possible containers; 1 possible objects',
|
||||
'Pass completed in 0s; 1 objects expired',
|
||||
])
|
||||
self.assertFalse(got_unicode[0])
|
||||
@ -663,20 +802,23 @@ class TestObjectExpirer(TestCase):
|
||||
x.run_once()
|
||||
error_lines = x.logger.get_lines_for_level('error')
|
||||
self.assertEqual(sorted(error_lines), sorted([
|
||||
'Exception while deleting object %d %d-a/c/actual-obj failed to '
|
||||
'delete actual object: ' % (cts, ots),
|
||||
'Exception while deleting object %d %d-a/c/next-obj failed to '
|
||||
'delete actual object: ' % (cts, ots),
|
||||
'Exception while deleting object %d %d-a/c/actual-obj failed to '
|
||||
'delete actual object: ' % (cts + 1, ots),
|
||||
'Exception while deleting object %d %d-a/c/next-obj failed to '
|
||||
'delete actual object: ' % (cts + 1, ots),
|
||||
'Exception while deleting container %d failed to delete '
|
||||
'container: ' % (cts,),
|
||||
'Exception while deleting container %d failed to delete '
|
||||
'container: ' % (cts + 1,)]))
|
||||
'Exception while deleting object .expiring_objects %d '
|
||||
'%d-a/c/actual-obj failed to delete actual object: ' % (cts, ots),
|
||||
'Exception while deleting object .expiring_objects %d '
|
||||
'%d-a/c/next-obj failed to delete actual object: ' % (cts, ots),
|
||||
'Exception while deleting object .expiring_objects %d '
|
||||
'%d-a/c/actual-obj failed to delete actual object: ' %
|
||||
(cts + 1, ots),
|
||||
'Exception while deleting object .expiring_objects %d '
|
||||
'%d-a/c/next-obj failed to delete actual object: ' %
|
||||
(cts + 1, ots),
|
||||
'Exception while deleting container .expiring_objects %d '
|
||||
'failed to delete container: ' % (cts,),
|
||||
'Exception while deleting container .expiring_objects %d '
|
||||
'failed to delete container: ' % (cts + 1,)]))
|
||||
self.assertEqual(x.logger.get_lines_for_level('info'), [
|
||||
'Pass beginning; 2 possible containers; 4 possible objects',
|
||||
'Pass beginning for task account .expiring_objects; '
|
||||
'2 possible containers; 4 possible objects',
|
||||
'Pass completed in 0s; 0 objects expired',
|
||||
])
|
||||
|
||||
@ -848,13 +990,13 @@ class TestObjectExpirer(TestCase):
|
||||
requests.append((method, path))
|
||||
with mocked_http_conn(
|
||||
200, 200, 200, give_connect=capture_requests) as fake_conn:
|
||||
x.pop_queue('c', 'o')
|
||||
x.pop_queue('a', 'c', 'o')
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
for method, path in requests:
|
||||
self.assertEqual(method, 'DELETE')
|
||||
device, part, account, container, obj = utils.split_path(
|
||||
path, 5, 5, True)
|
||||
self.assertEqual(account, '.expiring_objects')
|
||||
self.assertEqual(account, 'a')
|
||||
self.assertEqual(container, 'c')
|
||||
self.assertEqual(obj, 'o')
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user