Object-expirer: continue to process next container on listing errors
When Expirer is iterating task containers and doing listings, it's possible one of the container nodes hosting the task container may become overloaded (or have a really low backend ratelimit set). Object-expirer should expect UnexpectedResponse and continue to try and list the task objects in the next container. And if the task container doesn't exist, expirer should not try to delete the non-existent containers, before continue to work on the next container. Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Change-Id: Id1966fa22725a02471e2d7c5a42fb243b1cfcf6a
This commit is contained in:
parent
44653d8efb
commit
7980b6a0d3
@ -328,6 +328,70 @@ class ObjectExpirer(Daemon):
|
||||
return get_delay_reaping(self.delay_reaping_times, target_account,
|
||||
target_container)
|
||||
|
||||
def _iter_task_container(self, task_account, task_container,
|
||||
my_index, divisor):
|
||||
"""
|
||||
Iterates the input task container, yields a task expire info dict for
|
||||
each delete task if it is assigned to this expirer process.
|
||||
|
||||
:raises UnexpectedResponse: if the task container listing is not
|
||||
successful.
|
||||
"""
|
||||
container_empty = True
|
||||
for o in self.swift.iter_objects(task_account,
|
||||
task_container,
|
||||
acceptable_statuses=[2]):
|
||||
container_empty = False
|
||||
task_object = o['name'].encode('utf8') if six.PY2 else o['name']
|
||||
try:
|
||||
delete_timestamp, target_account, target_container, \
|
||||
target_object = parse_task_obj(task_object)
|
||||
except ValueError:
|
||||
self.logger.exception('Unexcepted error handling task %r' %
|
||||
task_object)
|
||||
self.logger.increment('tasks.parse_errors')
|
||||
continue
|
||||
is_async = o.get('content_type') == ASYNC_DELETE_TYPE
|
||||
delay_reaping = self.get_delay_reaping(target_account,
|
||||
target_container)
|
||||
|
||||
if delete_timestamp > Timestamp.now():
|
||||
# we shouldn't yield ANY more objects that can't reach
|
||||
# the expiration date yet.
|
||||
break
|
||||
|
||||
# Only one expirer daemon assigned for each task
|
||||
if self.hash_mod('%s/%s' % (task_container, task_object),
|
||||
divisor) != my_index:
|
||||
self.logger.increment('tasks.skipped')
|
||||
continue
|
||||
|
||||
if delete_timestamp > Timestamp(time() - delay_reaping) \
|
||||
and not is_async:
|
||||
# we shouldn't yield the object during the delay
|
||||
self.logger.increment('tasks.delayed')
|
||||
continue
|
||||
|
||||
self.logger.increment('tasks.assigned')
|
||||
yield {'task_account': task_account,
|
||||
'task_container': task_container,
|
||||
'task_object': task_object,
|
||||
'target_path': '/'.join([
|
||||
target_account, target_container, target_object]),
|
||||
'delete_timestamp': delete_timestamp,
|
||||
'is_async_delete': is_async}
|
||||
if container_empty:
|
||||
try:
|
||||
self.swift.delete_container(
|
||||
task_account, task_container,
|
||||
acceptable_statuses=(2, HTTP_NOT_FOUND, HTTP_CONFLICT))
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.exception(
|
||||
'Exception while deleting container %(account)s '
|
||||
'%(container)s %(err)s', {
|
||||
'account': task_account,
|
||||
'container': task_container, 'err': str(err)})
|
||||
|
||||
def iter_task_to_expire(self, task_account_container_list,
|
||||
my_index, divisor):
|
||||
"""
|
||||
@ -335,61 +399,27 @@ class ObjectExpirer(Daemon):
|
||||
task_container, task_object, timestamp_to_delete, and target_path
|
||||
"""
|
||||
for task_account, task_container in task_account_container_list:
|
||||
container_empty = True
|
||||
for o in self.swift.iter_objects(task_account, task_container):
|
||||
container_empty = False
|
||||
if six.PY2:
|
||||
task_object = o['name'].encode('utf8')
|
||||
else:
|
||||
task_object = o['name']
|
||||
try:
|
||||
delete_timestamp, target_account, target_container, \
|
||||
target_object = parse_task_obj(task_object)
|
||||
except ValueError:
|
||||
self.logger.exception('Unexcepted error handling task %r' %
|
||||
task_object)
|
||||
self.logger.increment('tasks.parse_errors')
|
||||
continue
|
||||
is_async = o.get('content_type') == ASYNC_DELETE_TYPE
|
||||
delay_reaping = self.get_delay_reaping(target_account,
|
||||
target_container)
|
||||
|
||||
if delete_timestamp > Timestamp.now():
|
||||
# we shouldn't yield ANY more objects that can't reach
|
||||
# the expiration date yet.
|
||||
break
|
||||
|
||||
# Only one expirer daemon assigned for each task
|
||||
if self.hash_mod('%s/%s' % (task_container, task_object),
|
||||
divisor) != my_index:
|
||||
self.logger.increment('tasks.skipped')
|
||||
continue
|
||||
|
||||
if delete_timestamp > Timestamp(time() - delay_reaping) \
|
||||
and not is_async:
|
||||
# we shouldn't yield the object during the delay
|
||||
self.logger.increment('tasks.delayed')
|
||||
continue
|
||||
|
||||
self.logger.increment('tasks.assigned')
|
||||
yield {'task_account': task_account,
|
||||
'task_container': task_container,
|
||||
'task_object': task_object,
|
||||
'target_path': '/'.join([
|
||||
target_account, target_container, target_object]),
|
||||
'delete_timestamp': delete_timestamp,
|
||||
'is_async_delete': is_async}
|
||||
if container_empty:
|
||||
try:
|
||||
self.swift.delete_container(
|
||||
task_account, task_container,
|
||||
acceptable_statuses=(2, HTTP_NOT_FOUND, HTTP_CONFLICT))
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.exception(
|
||||
'Exception while deleting container %(account)s '
|
||||
'%(container)s %(err)s', {
|
||||
try:
|
||||
for item in self._iter_task_container(
|
||||
task_account, task_container, my_index, divisor):
|
||||
yield item
|
||||
except UnexpectedResponse as err:
|
||||
if err.resp.status_int != 404:
|
||||
self.logger.error(
|
||||
'Unexpected response while listing objects in '
|
||||
'container %(account)s %(container)s: %(err)s', {
|
||||
'account': task_account,
|
||||
'container': task_container, 'err': str(err)})
|
||||
'container': task_container,
|
||||
'err': str(err)
|
||||
})
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.error(
|
||||
'Exception while listing objects in container %(account)s '
|
||||
'%(container)s: %(err)s', {
|
||||
'account': task_account,
|
||||
'container': task_container,
|
||||
'err': str(err)
|
||||
})
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""
|
||||
|
@ -31,6 +31,7 @@ from six.moves import urllib
|
||||
|
||||
from swift.common import internal_client, utils, swob
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.common.swob import Response
|
||||
from swift.obj import expirer, diskfile
|
||||
|
||||
|
||||
@ -83,7 +84,7 @@ class FakeInternalClient(object):
|
||||
def delete_container(*a, **kw):
|
||||
pass
|
||||
|
||||
def iter_objects(self, account, container):
|
||||
def iter_objects(self, account, container, **kwargs):
|
||||
acc_dict = self.aco_dict[account]
|
||||
obj_iter = acc_dict.get(container, [])
|
||||
resp = []
|
||||
@ -1580,6 +1581,172 @@ class TestObjectExpirer(TestCase):
|
||||
task_account_container_list, 0, 1))
|
||||
self.assertEqual(expected, observed)
|
||||
|
||||
def test_iter_task_to_expire_unexpected_response(self):
|
||||
# Test that object listing on the first container returns 503 and raise
|
||||
# UnexpectedResponse, and expect the second task container will
|
||||
# continue to be processed.
|
||||
# In this test, all tasks are assigned to the tested expirer.
|
||||
my_index = 0
|
||||
divisor = 1
|
||||
|
||||
# Store reference to the real method before mocking
|
||||
real_iter_objects = self.fake_swift.iter_objects
|
||||
|
||||
def mock_iter_objects(account, container, **kwargs):
|
||||
if container == self.just_past_time_container:
|
||||
mock_resp = Response(status=503)
|
||||
raise internal_client.UnexpectedResponse(
|
||||
'Mocked error', mock_resp)
|
||||
return real_iter_objects(account, container)
|
||||
|
||||
task_account_container_list = [
|
||||
('.expiring_objects', self.just_past_time_container),
|
||||
('.expiring_objects', self.past_time_container)
|
||||
]
|
||||
expected = [
|
||||
self.make_task(self.past_time_container, self.past_time,
|
||||
target_path)
|
||||
for target_path in self.expired_target_paths[self.past_time]]
|
||||
|
||||
with mock.patch.object(self.expirer.swift, 'iter_objects',
|
||||
mock_iter_objects):
|
||||
with mock.patch.object(self.expirer.swift, 'delete_container') \
|
||||
as mock_delete_container:
|
||||
self.assertEqual(
|
||||
list(self.expirer.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
expected)
|
||||
self.assertEqual(mock_delete_container.mock_calls, [])
|
||||
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
log_lines,
|
||||
['Unexpected response while listing objects in container '
|
||||
'.expiring_objects %s: Mocked error'
|
||||
% self.just_past_time_container]
|
||||
)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 5},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
def test_iter_task_to_expire_exception(self):
|
||||
# Test that object listing on the first container raise Exception, and
|
||||
# expect the second task container will continue to be processed.
|
||||
# In this test, all tasks are assigned to the tested expirer.
|
||||
my_index = 0
|
||||
divisor = 1
|
||||
|
||||
# Store reference to the real method before mocking
|
||||
real_iter_objects = self.fake_swift.iter_objects
|
||||
|
||||
def mock_iter_objects(account, container, **kwargs):
|
||||
if container == self.just_past_time_container:
|
||||
raise Exception('failed to connect')
|
||||
return real_iter_objects(account, container)
|
||||
|
||||
task_account_container_list = [
|
||||
('.expiring_objects', self.just_past_time_container),
|
||||
('.expiring_objects', self.past_time_container)
|
||||
]
|
||||
expected = [
|
||||
self.make_task(self.past_time_container, self.past_time,
|
||||
target_path)
|
||||
for target_path in self.expired_target_paths[self.past_time]]
|
||||
|
||||
with mock.patch.object(self.expirer.swift, 'iter_objects',
|
||||
mock_iter_objects):
|
||||
with mock.patch.object(self.expirer.swift, 'delete_container') \
|
||||
as mock_delete_container:
|
||||
self.assertEqual(
|
||||
list(self.expirer.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
expected)
|
||||
self.assertEqual(mock_delete_container.mock_calls, [])
|
||||
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
log_lines,
|
||||
['Exception while listing objects in container '
|
||||
'.expiring_objects %s: failed to connect'
|
||||
% self.just_past_time_container]
|
||||
)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 5},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
def test_iter_task_to_expire_404_response_on_empty_container(self):
|
||||
# Test that object listing on an empty container returns 404 and
|
||||
# raise UnexpectedResponse, and expect ``iter_task_to_expire`` won't
|
||||
# delete this task container.
|
||||
# In this test, all tasks are assigned to the tested expirer.
|
||||
my_index = 0
|
||||
divisor = 1
|
||||
|
||||
err_resp = Response(status=404)
|
||||
err = internal_client.UnexpectedResponse('Mocked error', err_resp)
|
||||
|
||||
task_account_container_list = [
|
||||
('.expiring_objects', self.empty_time_container)
|
||||
]
|
||||
|
||||
with mock.patch.object(self.expirer.swift, 'iter_objects',
|
||||
side_effect=err) as mock_method:
|
||||
with mock.patch.object(self.expirer.swift, 'delete_container') \
|
||||
as mock_delete_container:
|
||||
self.assertEqual(
|
||||
list(self.expirer.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
[])
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertFalse(log_lines)
|
||||
# This empty task container won't get deleted.
|
||||
self.assertEqual(mock_delete_container.mock_calls, [])
|
||||
self.assertEqual(
|
||||
{}, self.expirer.logger.statsd_client.get_increment_counts())
|
||||
self.assertEqual(mock_method.call_args_list, [
|
||||
mock.call('.expiring_objects',
|
||||
self.empty_time_container,
|
||||
acceptable_statuses=[2])
|
||||
])
|
||||
|
||||
def test_iter_task_to_expire_503_response_on_empty_container(self):
|
||||
# Test that object listing on an empty container returns 503 and
|
||||
# raise UnexpectedResponse, and expect ``iter_task_to_expire`` won't
|
||||
# delete this task container.
|
||||
# In this test, all tasks are assigned to the tested expirer.
|
||||
my_index = 0
|
||||
divisor = 1
|
||||
|
||||
def mock_iter_objects(account, container, **kwargs):
|
||||
mock_resp = Response(status=503)
|
||||
raise internal_client.UnexpectedResponse('Mocked error', mock_resp)
|
||||
|
||||
task_account_container_list = [
|
||||
('.expiring_objects', self.empty_time_container)
|
||||
]
|
||||
|
||||
with mock.patch.object(self.expirer.swift, 'iter_objects',
|
||||
side_effect=mock_iter_objects):
|
||||
with mock.patch.object(self.expirer.swift, 'delete_container') \
|
||||
as mock_delete_container:
|
||||
self.assertEqual(
|
||||
list(self.expirer.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
[])
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
log_lines[0],
|
||||
'Unexpected response while listing objects in container '
|
||||
'.expiring_objects %s: Mocked error'
|
||||
% self.empty_time_container,
|
||||
)
|
||||
# This empty task container won't get deleted.
|
||||
self.assertEqual(mock_delete_container.mock_calls, [])
|
||||
self.assertEqual(
|
||||
{}, self.expirer.logger.statsd_client.get_increment_counts())
|
||||
|
||||
def test_run_once_unicode_problem(self):
|
||||
requests = []
|
||||
|
||||
@ -1598,9 +1765,15 @@ class TestObjectExpirer(TestCase):
|
||||
|
||||
# iter_objects is called only for past_time, not future_time
|
||||
self.assertEqual(mock_method.call_args_list, [
|
||||
mock.call('.expiring_objects', self.empty_time_container),
|
||||
mock.call('.expiring_objects', self.past_time_container),
|
||||
mock.call('.expiring_objects', self.just_past_time_container)])
|
||||
mock.call('.expiring_objects',
|
||||
self.empty_time_container,
|
||||
acceptable_statuses=[2]),
|
||||
mock.call('.expiring_objects',
|
||||
self.past_time_container,
|
||||
acceptable_statuses=[2]),
|
||||
mock.call('.expiring_objects',
|
||||
self.just_past_time_container,
|
||||
acceptable_statuses=[2])])
|
||||
|
||||
def test_object_timestamp_break(self):
|
||||
with mock.patch.object(self.expirer, 'delete_actual_object') \
|
||||
|
Loading…
x
Reference in New Issue
Block a user