diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index d6c3d5b6c4..6a1cf386cd 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -328,6 +328,70 @@ class ObjectExpirer(Daemon): return get_delay_reaping(self.delay_reaping_times, target_account, target_container) + def _iter_task_container(self, task_account, task_container, + my_index, divisor): + """ + Iterates the input task container, yields a task expire info dict for + each delete task if it is assigned to this expirer process. + + :raises UnexpectedResponse: if the task container listing is not + successful. + """ + container_empty = True + for o in self.swift.iter_objects(task_account, + task_container, + acceptable_statuses=[2]): + container_empty = False + task_object = o['name'].encode('utf8') if six.PY2 else o['name'] + try: + delete_timestamp, target_account, target_container, \ + target_object = parse_task_obj(task_object) + except ValueError: + self.logger.exception('Unexcepted error handling task %r' % + task_object) + self.logger.increment('tasks.parse_errors') + continue + is_async = o.get('content_type') == ASYNC_DELETE_TYPE + delay_reaping = self.get_delay_reaping(target_account, + target_container) + + if delete_timestamp > Timestamp.now(): + # we shouldn't yield ANY more objects that can't reach + # the expiration date yet. + break + + # Only one expirer daemon assigned for each task + if self.hash_mod('%s/%s' % (task_container, task_object), + divisor) != my_index: + self.logger.increment('tasks.skipped') + continue + + if delete_timestamp > Timestamp(time() - delay_reaping) \ + and not is_async: + # we shouldn't yield the object during the delay + self.logger.increment('tasks.delayed') + continue + + self.logger.increment('tasks.assigned') + yield {'task_account': task_account, + 'task_container': task_container, + 'task_object': task_object, + 'target_path': '/'.join([ + target_account, target_container, target_object]), + 'delete_timestamp': delete_timestamp, + 'is_async_delete': is_async} + if container_empty: + try: + self.swift.delete_container( + task_account, task_container, + acceptable_statuses=(2, HTTP_NOT_FOUND, HTTP_CONFLICT)) + except (Exception, Timeout) as err: + self.logger.exception( + 'Exception while deleting container %(account)s ' + '%(container)s %(err)s', { + 'account': task_account, + 'container': task_container, 'err': str(err)}) + def iter_task_to_expire(self, task_account_container_list, my_index, divisor): """ @@ -335,61 +399,27 @@ class ObjectExpirer(Daemon): task_container, task_object, timestamp_to_delete, and target_path """ for task_account, task_container in task_account_container_list: - container_empty = True - for o in self.swift.iter_objects(task_account, task_container): - container_empty = False - if six.PY2: - task_object = o['name'].encode('utf8') - else: - task_object = o['name'] - try: - delete_timestamp, target_account, target_container, \ - target_object = parse_task_obj(task_object) - except ValueError: - self.logger.exception('Unexcepted error handling task %r' % - task_object) - self.logger.increment('tasks.parse_errors') - continue - is_async = o.get('content_type') == ASYNC_DELETE_TYPE - delay_reaping = self.get_delay_reaping(target_account, - target_container) - - if delete_timestamp > Timestamp.now(): - # we shouldn't yield ANY more objects that can't reach - # the expiration date yet. - break - - # Only one expirer daemon assigned for each task - if self.hash_mod('%s/%s' % (task_container, task_object), - divisor) != my_index: - self.logger.increment('tasks.skipped') - continue - - if delete_timestamp > Timestamp(time() - delay_reaping) \ - and not is_async: - # we shouldn't yield the object during the delay - self.logger.increment('tasks.delayed') - continue - - self.logger.increment('tasks.assigned') - yield {'task_account': task_account, - 'task_container': task_container, - 'task_object': task_object, - 'target_path': '/'.join([ - target_account, target_container, target_object]), - 'delete_timestamp': delete_timestamp, - 'is_async_delete': is_async} - if container_empty: - try: - self.swift.delete_container( - task_account, task_container, - acceptable_statuses=(2, HTTP_NOT_FOUND, HTTP_CONFLICT)) - except (Exception, Timeout) as err: - self.logger.exception( - 'Exception while deleting container %(account)s ' - '%(container)s %(err)s', { + try: + for item in self._iter_task_container( + task_account, task_container, my_index, divisor): + yield item + except UnexpectedResponse as err: + if err.resp.status_int != 404: + self.logger.error( + 'Unexpected response while listing objects in ' + 'container %(account)s %(container)s: %(err)s', { 'account': task_account, - 'container': task_container, 'err': str(err)}) + 'container': task_container, + 'err': str(err) + }) + except (Exception, Timeout) as err: + self.logger.error( + 'Exception while listing objects in container %(account)s ' + '%(container)s: %(err)s', { + 'account': task_account, + 'container': task_container, + 'err': str(err) + }) def run_once(self, *args, **kwargs): """ diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py index 501c5522d9..7d7378761c 100644 --- a/test/unit/obj/test_expirer.py +++ b/test/unit/obj/test_expirer.py @@ -31,6 +31,7 @@ from six.moves import urllib from swift.common import internal_client, utils, swob from swift.common.utils import Timestamp +from swift.common.swob import Response from swift.obj import expirer, diskfile @@ -83,7 +84,7 @@ class FakeInternalClient(object): def delete_container(*a, **kw): pass - def iter_objects(self, account, container): + def iter_objects(self, account, container, **kwargs): acc_dict = self.aco_dict[account] obj_iter = acc_dict.get(container, []) resp = [] @@ -1580,6 +1581,172 @@ class TestObjectExpirer(TestCase): task_account_container_list, 0, 1)) self.assertEqual(expected, observed) + def test_iter_task_to_expire_unexpected_response(self): + # Test that object listing on the first container returns 503 and raise + # UnexpectedResponse, and expect the second task container will + # continue to be processed. + # In this test, all tasks are assigned to the tested expirer. + my_index = 0 + divisor = 1 + + # Store reference to the real method before mocking + real_iter_objects = self.fake_swift.iter_objects + + def mock_iter_objects(account, container, **kwargs): + if container == self.just_past_time_container: + mock_resp = Response(status=503) + raise internal_client.UnexpectedResponse( + 'Mocked error', mock_resp) + return real_iter_objects(account, container) + + task_account_container_list = [ + ('.expiring_objects', self.just_past_time_container), + ('.expiring_objects', self.past_time_container) + ] + expected = [ + self.make_task(self.past_time_container, self.past_time, + target_path) + for target_path in self.expired_target_paths[self.past_time]] + + with mock.patch.object(self.expirer.swift, 'iter_objects', + mock_iter_objects): + with mock.patch.object(self.expirer.swift, 'delete_container') \ + as mock_delete_container: + self.assertEqual( + list(self.expirer.iter_task_to_expire( + task_account_container_list, my_index, divisor)), + expected) + self.assertEqual(mock_delete_container.mock_calls, []) + + log_lines = self.logger.get_lines_for_level('error') + self.assertEqual( + log_lines, + ['Unexpected response while listing objects in container ' + '.expiring_objects %s: Mocked error' + % self.just_past_time_container] + ) + self.assertEqual( + {'tasks.assigned': 5}, + self.expirer.logger.statsd_client.get_increment_counts() + ) + + def test_iter_task_to_expire_exception(self): + # Test that object listing on the first container raise Exception, and + # expect the second task container will continue to be processed. + # In this test, all tasks are assigned to the tested expirer. + my_index = 0 + divisor = 1 + + # Store reference to the real method before mocking + real_iter_objects = self.fake_swift.iter_objects + + def mock_iter_objects(account, container, **kwargs): + if container == self.just_past_time_container: + raise Exception('failed to connect') + return real_iter_objects(account, container) + + task_account_container_list = [ + ('.expiring_objects', self.just_past_time_container), + ('.expiring_objects', self.past_time_container) + ] + expected = [ + self.make_task(self.past_time_container, self.past_time, + target_path) + for target_path in self.expired_target_paths[self.past_time]] + + with mock.patch.object(self.expirer.swift, 'iter_objects', + mock_iter_objects): + with mock.patch.object(self.expirer.swift, 'delete_container') \ + as mock_delete_container: + self.assertEqual( + list(self.expirer.iter_task_to_expire( + task_account_container_list, my_index, divisor)), + expected) + self.assertEqual(mock_delete_container.mock_calls, []) + + log_lines = self.logger.get_lines_for_level('error') + self.assertEqual( + log_lines, + ['Exception while listing objects in container ' + '.expiring_objects %s: failed to connect' + % self.just_past_time_container] + ) + self.assertEqual( + {'tasks.assigned': 5}, + self.expirer.logger.statsd_client.get_increment_counts() + ) + + def test_iter_task_to_expire_404_response_on_empty_container(self): + # Test that object listing on an empty container returns 404 and + # raise UnexpectedResponse, and expect ``iter_task_to_expire`` won't + # delete this task container. + # In this test, all tasks are assigned to the tested expirer. + my_index = 0 + divisor = 1 + + err_resp = Response(status=404) + err = internal_client.UnexpectedResponse('Mocked error', err_resp) + + task_account_container_list = [ + ('.expiring_objects', self.empty_time_container) + ] + + with mock.patch.object(self.expirer.swift, 'iter_objects', + side_effect=err) as mock_method: + with mock.patch.object(self.expirer.swift, 'delete_container') \ + as mock_delete_container: + self.assertEqual( + list(self.expirer.iter_task_to_expire( + task_account_container_list, my_index, divisor)), + []) + log_lines = self.logger.get_lines_for_level('error') + self.assertFalse(log_lines) + # This empty task container won't get deleted. + self.assertEqual(mock_delete_container.mock_calls, []) + self.assertEqual( + {}, self.expirer.logger.statsd_client.get_increment_counts()) + self.assertEqual(mock_method.call_args_list, [ + mock.call('.expiring_objects', + self.empty_time_container, + acceptable_statuses=[2]) + ]) + + def test_iter_task_to_expire_503_response_on_empty_container(self): + # Test that object listing on an empty container returns 503 and + # raise UnexpectedResponse, and expect ``iter_task_to_expire`` won't + # delete this task container. + # In this test, all tasks are assigned to the tested expirer. + my_index = 0 + divisor = 1 + + def mock_iter_objects(account, container, **kwargs): + mock_resp = Response(status=503) + raise internal_client.UnexpectedResponse('Mocked error', mock_resp) + + task_account_container_list = [ + ('.expiring_objects', self.empty_time_container) + ] + + with mock.patch.object(self.expirer.swift, 'iter_objects', + side_effect=mock_iter_objects): + with mock.patch.object(self.expirer.swift, 'delete_container') \ + as mock_delete_container: + self.assertEqual( + list(self.expirer.iter_task_to_expire( + task_account_container_list, my_index, divisor)), + []) + log_lines = self.logger.get_lines_for_level('error') + self.assertEqual( + log_lines[0], + 'Unexpected response while listing objects in container ' + '.expiring_objects %s: Mocked error' + % self.empty_time_container, + ) + # This empty task container won't get deleted. + self.assertEqual(mock_delete_container.mock_calls, []) + self.assertEqual( + {}, self.expirer.logger.statsd_client.get_increment_counts()) + def test_run_once_unicode_problem(self): requests = [] @@ -1598,9 +1765,15 @@ class TestObjectExpirer(TestCase): # iter_objects is called only for past_time, not future_time self.assertEqual(mock_method.call_args_list, [ - mock.call('.expiring_objects', self.empty_time_container), - mock.call('.expiring_objects', self.past_time_container), - mock.call('.expiring_objects', self.just_past_time_container)]) + mock.call('.expiring_objects', + self.empty_time_container, + acceptable_statuses=[2]), + mock.call('.expiring_objects', + self.past_time_container, + acceptable_statuses=[2]), + mock.call('.expiring_objects', + self.just_past_time_container, + acceptable_statuses=[2])]) def test_object_timestamp_break(self): with mock.patch.object(self.expirer, 'delete_actual_object') \