Merge "Object-expirer: continue to process next container on listing errors"
This commit is contained in:
commit
4098e075f5
@ -328,6 +328,70 @@ class ObjectExpirer(Daemon):
|
||||
return get_delay_reaping(self.delay_reaping_times, target_account,
|
||||
target_container)
|
||||
|
||||
def _iter_task_container(self, task_account, task_container,
|
||||
my_index, divisor):
|
||||
"""
|
||||
Iterates the input task container, yields a task expire info dict for
|
||||
each delete task if it is assigned to this expirer process.
|
||||
|
||||
:raises UnexpectedResponse: if the task container listing is not
|
||||
successful.
|
||||
"""
|
||||
container_empty = True
|
||||
for o in self.swift.iter_objects(task_account,
|
||||
task_container,
|
||||
acceptable_statuses=[2]):
|
||||
container_empty = False
|
||||
task_object = o['name'].encode('utf8') if six.PY2 else o['name']
|
||||
try:
|
||||
delete_timestamp, target_account, target_container, \
|
||||
target_object = parse_task_obj(task_object)
|
||||
except ValueError:
|
||||
self.logger.exception('Unexcepted error handling task %r' %
|
||||
task_object)
|
||||
self.logger.increment('tasks.parse_errors')
|
||||
continue
|
||||
is_async = o.get('content_type') == ASYNC_DELETE_TYPE
|
||||
delay_reaping = self.get_delay_reaping(target_account,
|
||||
target_container)
|
||||
|
||||
if delete_timestamp > Timestamp.now():
|
||||
# we shouldn't yield ANY more objects that can't reach
|
||||
# the expiration date yet.
|
||||
break
|
||||
|
||||
# Only one expirer daemon assigned for each task
|
||||
if self.hash_mod('%s/%s' % (task_container, task_object),
|
||||
divisor) != my_index:
|
||||
self.logger.increment('tasks.skipped')
|
||||
continue
|
||||
|
||||
if delete_timestamp > Timestamp(time() - delay_reaping) \
|
||||
and not is_async:
|
||||
# we shouldn't yield the object during the delay
|
||||
self.logger.increment('tasks.delayed')
|
||||
continue
|
||||
|
||||
self.logger.increment('tasks.assigned')
|
||||
yield {'task_account': task_account,
|
||||
'task_container': task_container,
|
||||
'task_object': task_object,
|
||||
'target_path': '/'.join([
|
||||
target_account, target_container, target_object]),
|
||||
'delete_timestamp': delete_timestamp,
|
||||
'is_async_delete': is_async}
|
||||
if container_empty:
|
||||
try:
|
||||
self.swift.delete_container(
|
||||
task_account, task_container,
|
||||
acceptable_statuses=(2, HTTP_NOT_FOUND, HTTP_CONFLICT))
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.exception(
|
||||
'Exception while deleting container %(account)s '
|
||||
'%(container)s %(err)s', {
|
||||
'account': task_account,
|
||||
'container': task_container, 'err': str(err)})
|
||||
|
||||
def iter_task_to_expire(self, task_account_container_list,
|
||||
my_index, divisor):
|
||||
"""
|
||||
@ -335,61 +399,27 @@ class ObjectExpirer(Daemon):
|
||||
task_container, task_object, timestamp_to_delete, and target_path
|
||||
"""
|
||||
for task_account, task_container in task_account_container_list:
|
||||
container_empty = True
|
||||
for o in self.swift.iter_objects(task_account, task_container):
|
||||
container_empty = False
|
||||
if six.PY2:
|
||||
task_object = o['name'].encode('utf8')
|
||||
else:
|
||||
task_object = o['name']
|
||||
try:
|
||||
delete_timestamp, target_account, target_container, \
|
||||
target_object = parse_task_obj(task_object)
|
||||
except ValueError:
|
||||
self.logger.exception('Unexcepted error handling task %r' %
|
||||
task_object)
|
||||
self.logger.increment('tasks.parse_errors')
|
||||
continue
|
||||
is_async = o.get('content_type') == ASYNC_DELETE_TYPE
|
||||
delay_reaping = self.get_delay_reaping(target_account,
|
||||
target_container)
|
||||
|
||||
if delete_timestamp > Timestamp.now():
|
||||
# we shouldn't yield ANY more objects that can't reach
|
||||
# the expiration date yet.
|
||||
break
|
||||
|
||||
# Only one expirer daemon assigned for each task
|
||||
if self.hash_mod('%s/%s' % (task_container, task_object),
|
||||
divisor) != my_index:
|
||||
self.logger.increment('tasks.skipped')
|
||||
continue
|
||||
|
||||
if delete_timestamp > Timestamp(time() - delay_reaping) \
|
||||
and not is_async:
|
||||
# we shouldn't yield the object during the delay
|
||||
self.logger.increment('tasks.delayed')
|
||||
continue
|
||||
|
||||
self.logger.increment('tasks.assigned')
|
||||
yield {'task_account': task_account,
|
||||
'task_container': task_container,
|
||||
'task_object': task_object,
|
||||
'target_path': '/'.join([
|
||||
target_account, target_container, target_object]),
|
||||
'delete_timestamp': delete_timestamp,
|
||||
'is_async_delete': is_async}
|
||||
if container_empty:
|
||||
try:
|
||||
self.swift.delete_container(
|
||||
task_account, task_container,
|
||||
acceptable_statuses=(2, HTTP_NOT_FOUND, HTTP_CONFLICT))
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.exception(
|
||||
'Exception while deleting container %(account)s '
|
||||
'%(container)s %(err)s', {
|
||||
try:
|
||||
for item in self._iter_task_container(
|
||||
task_account, task_container, my_index, divisor):
|
||||
yield item
|
||||
except UnexpectedResponse as err:
|
||||
if err.resp.status_int != 404:
|
||||
self.logger.error(
|
||||
'Unexpected response while listing objects in '
|
||||
'container %(account)s %(container)s: %(err)s', {
|
||||
'account': task_account,
|
||||
'container': task_container, 'err': str(err)})
|
||||
'container': task_container,
|
||||
'err': str(err)
|
||||
})
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.error(
|
||||
'Exception while listing objects in container %(account)s '
|
||||
'%(container)s: %(err)s', {
|
||||
'account': task_account,
|
||||
'container': task_container,
|
||||
'err': str(err)
|
||||
})
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""
|
||||
|
@ -31,6 +31,7 @@ from six.moves import urllib
|
||||
|
||||
from swift.common import internal_client, utils, swob
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.common.swob import Response
|
||||
from swift.obj import expirer, diskfile
|
||||
|
||||
|
||||
@ -83,7 +84,7 @@ class FakeInternalClient(object):
|
||||
def delete_container(*a, **kw):
|
||||
pass
|
||||
|
||||
def iter_objects(self, account, container):
|
||||
def iter_objects(self, account, container, **kwargs):
|
||||
acc_dict = self.aco_dict[account]
|
||||
obj_iter = acc_dict.get(container, [])
|
||||
resp = []
|
||||
@ -1580,6 +1581,172 @@ class TestObjectExpirer(TestCase):
|
||||
task_account_container_list, 0, 1))
|
||||
self.assertEqual(expected, observed)
|
||||
|
||||
def test_iter_task_to_expire_unexpected_response(self):
|
||||
# Test that object listing on the first container returns 503 and raise
|
||||
# UnexpectedResponse, and expect the second task container will
|
||||
# continue to be processed.
|
||||
# In this test, all tasks are assigned to the tested expirer.
|
||||
my_index = 0
|
||||
divisor = 1
|
||||
|
||||
# Store reference to the real method before mocking
|
||||
real_iter_objects = self.fake_swift.iter_objects
|
||||
|
||||
def mock_iter_objects(account, container, **kwargs):
|
||||
if container == self.just_past_time_container:
|
||||
mock_resp = Response(status=503)
|
||||
raise internal_client.UnexpectedResponse(
|
||||
'Mocked error', mock_resp)
|
||||
return real_iter_objects(account, container)
|
||||
|
||||
task_account_container_list = [
|
||||
('.expiring_objects', self.just_past_time_container),
|
||||
('.expiring_objects', self.past_time_container)
|
||||
]
|
||||
expected = [
|
||||
self.make_task(self.past_time_container, self.past_time,
|
||||
target_path)
|
||||
for target_path in self.expired_target_paths[self.past_time]]
|
||||
|
||||
with mock.patch.object(self.expirer.swift, 'iter_objects',
|
||||
mock_iter_objects):
|
||||
with mock.patch.object(self.expirer.swift, 'delete_container') \
|
||||
as mock_delete_container:
|
||||
self.assertEqual(
|
||||
list(self.expirer.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
expected)
|
||||
self.assertEqual(mock_delete_container.mock_calls, [])
|
||||
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
log_lines,
|
||||
['Unexpected response while listing objects in container '
|
||||
'.expiring_objects %s: Mocked error'
|
||||
% self.just_past_time_container]
|
||||
)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 5},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
def test_iter_task_to_expire_exception(self):
|
||||
# Test that object listing on the first container raise Exception, and
|
||||
# expect the second task container will continue to be processed.
|
||||
# In this test, all tasks are assigned to the tested expirer.
|
||||
my_index = 0
|
||||
divisor = 1
|
||||
|
||||
# Store reference to the real method before mocking
|
||||
real_iter_objects = self.fake_swift.iter_objects
|
||||
|
||||
def mock_iter_objects(account, container, **kwargs):
|
||||
if container == self.just_past_time_container:
|
||||
raise Exception('failed to connect')
|
||||
return real_iter_objects(account, container)
|
||||
|
||||
task_account_container_list = [
|
||||
('.expiring_objects', self.just_past_time_container),
|
||||
('.expiring_objects', self.past_time_container)
|
||||
]
|
||||
expected = [
|
||||
self.make_task(self.past_time_container, self.past_time,
|
||||
target_path)
|
||||
for target_path in self.expired_target_paths[self.past_time]]
|
||||
|
||||
with mock.patch.object(self.expirer.swift, 'iter_objects',
|
||||
mock_iter_objects):
|
||||
with mock.patch.object(self.expirer.swift, 'delete_container') \
|
||||
as mock_delete_container:
|
||||
self.assertEqual(
|
||||
list(self.expirer.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
expected)
|
||||
self.assertEqual(mock_delete_container.mock_calls, [])
|
||||
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
log_lines,
|
||||
['Exception while listing objects in container '
|
||||
'.expiring_objects %s: failed to connect'
|
||||
% self.just_past_time_container]
|
||||
)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 5},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
def test_iter_task_to_expire_404_response_on_empty_container(self):
|
||||
# Test that object listing on an empty container returns 404 and
|
||||
# raise UnexpectedResponse, and expect ``iter_task_to_expire`` won't
|
||||
# delete this task container.
|
||||
# In this test, all tasks are assigned to the tested expirer.
|
||||
my_index = 0
|
||||
divisor = 1
|
||||
|
||||
err_resp = Response(status=404)
|
||||
err = internal_client.UnexpectedResponse('Mocked error', err_resp)
|
||||
|
||||
task_account_container_list = [
|
||||
('.expiring_objects', self.empty_time_container)
|
||||
]
|
||||
|
||||
with mock.patch.object(self.expirer.swift, 'iter_objects',
|
||||
side_effect=err) as mock_method:
|
||||
with mock.patch.object(self.expirer.swift, 'delete_container') \
|
||||
as mock_delete_container:
|
||||
self.assertEqual(
|
||||
list(self.expirer.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
[])
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertFalse(log_lines)
|
||||
# This empty task container won't get deleted.
|
||||
self.assertEqual(mock_delete_container.mock_calls, [])
|
||||
self.assertEqual(
|
||||
{}, self.expirer.logger.statsd_client.get_increment_counts())
|
||||
self.assertEqual(mock_method.call_args_list, [
|
||||
mock.call('.expiring_objects',
|
||||
self.empty_time_container,
|
||||
acceptable_statuses=[2])
|
||||
])
|
||||
|
||||
def test_iter_task_to_expire_503_response_on_empty_container(self):
|
||||
# Test that object listing on an empty container returns 503 and
|
||||
# raise UnexpectedResponse, and expect ``iter_task_to_expire`` won't
|
||||
# delete this task container.
|
||||
# In this test, all tasks are assigned to the tested expirer.
|
||||
my_index = 0
|
||||
divisor = 1
|
||||
|
||||
def mock_iter_objects(account, container, **kwargs):
|
||||
mock_resp = Response(status=503)
|
||||
raise internal_client.UnexpectedResponse('Mocked error', mock_resp)
|
||||
|
||||
task_account_container_list = [
|
||||
('.expiring_objects', self.empty_time_container)
|
||||
]
|
||||
|
||||
with mock.patch.object(self.expirer.swift, 'iter_objects',
|
||||
side_effect=mock_iter_objects):
|
||||
with mock.patch.object(self.expirer.swift, 'delete_container') \
|
||||
as mock_delete_container:
|
||||
self.assertEqual(
|
||||
list(self.expirer.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
[])
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
log_lines[0],
|
||||
'Unexpected response while listing objects in container '
|
||||
'.expiring_objects %s: Mocked error'
|
||||
% self.empty_time_container,
|
||||
)
|
||||
# This empty task container won't get deleted.
|
||||
self.assertEqual(mock_delete_container.mock_calls, [])
|
||||
self.assertEqual(
|
||||
{}, self.expirer.logger.statsd_client.get_increment_counts())
|
||||
|
||||
def test_run_once_unicode_problem(self):
|
||||
requests = []
|
||||
|
||||
@ -1598,9 +1765,15 @@ class TestObjectExpirer(TestCase):
|
||||
|
||||
# iter_objects is called only for past_time, not future_time
|
||||
self.assertEqual(mock_method.call_args_list, [
|
||||
mock.call('.expiring_objects', self.empty_time_container),
|
||||
mock.call('.expiring_objects', self.past_time_container),
|
||||
mock.call('.expiring_objects', self.just_past_time_container)])
|
||||
mock.call('.expiring_objects',
|
||||
self.empty_time_container,
|
||||
acceptable_statuses=[2]),
|
||||
mock.call('.expiring_objects',
|
||||
self.past_time_container,
|
||||
acceptable_statuses=[2]),
|
||||
mock.call('.expiring_objects',
|
||||
self.just_past_time_container,
|
||||
acceptable_statuses=[2])])
|
||||
|
||||
def test_object_timestamp_break(self):
|
||||
with mock.patch.object(self.expirer, 'delete_actual_object') \
|
||||
|
Loading…
x
Reference in New Issue
Block a user