diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index 0a0ad101b9..b96db3e54b 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -351,6 +351,7 @@ class ObjectExpirer(Daemon): except ValueError: self.logger.exception('Unexcepted error handling task %r' % task_object) + self.logger.increment('tasks.parse_errors') continue is_async = o.get('content_type') == ASYNC_DELETE_TYPE delay_reaping = self.get_delay_reaping(target_account, @@ -360,16 +361,20 @@ class ObjectExpirer(Daemon): # we shouldn't yield ANY more objects that can't reach # the expiration date yet. break - if delete_timestamp > Timestamp(time() - delay_reaping) \ - and not is_async: - # we shouldn't yield the object during the delay - continue # Only one expirer daemon assigned for each task if self.hash_mod('%s/%s' % (task_container, task_object), divisor) != my_index: + self.logger.increment('tasks.skipped') continue + if delete_timestamp > Timestamp(time() - delay_reaping) \ + and not is_async: + # we shouldn't yield the object during the delay + self.logger.increment('tasks.delayed') + continue + + self.logger.increment('tasks.assigned') yield {'task_account': task_account, 'task_container': task_container, 'task_object': task_object, diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py index 0fe134d3d7..001246b10b 100644 --- a/test/unit/obj/test_expirer.py +++ b/test/unit/obj/test_expirer.py @@ -15,6 +15,7 @@ # limitations under the License. import os +import itertools from time import time from unittest import main, TestCase from test.debug_logger import debug_logger @@ -648,6 +649,7 @@ class TestObjectExpirer(TestCase): self.deleted_objects[task_container].add(task_object) x = ObjectExpirer(self.conf, swift=self.fake_swift) + x.logger = self.logger deleted_objects = defaultdict(set) for i in range(3): @@ -659,6 +661,11 @@ class TestObjectExpirer(TestCase): self.assertFalse(deleted_objects[task_container] & deleted) deleted_objects[task_container] |= deleted + self.assertEqual({ + 'tasks.assigned': 10, + 'tasks.skipped': 20, + }, self.logger.statsd_client.get_increment_counts()) + # sort for comparison deleted_objects = { con: sorted(o_set) for con, o_set in deleted_objects.items()} @@ -1016,6 +1023,8 @@ class TestObjectExpirer(TestCase): self.assertEqual(mock_delete_container.mock_calls, [ mock.call('.expiring_objects', self.empty_time_container, acceptable_statuses=(2, 404, 409))]) + self.assertEqual( + {}, self.expirer.logger.statsd_client.get_increment_counts()) # 404 (account/container list race) gets deleted inline task_account_container_list = [ @@ -1030,6 +1039,8 @@ class TestObjectExpirer(TestCase): self.assertEqual(mock_delete_container.mock_calls, [ mock.call('.expiring_objects', 'does-not-matter', acceptable_statuses=(2, 404, 409))]) + self.assertEqual( + {}, self.expirer.logger.statsd_client.get_increment_counts()) # ready containers are processed task_account_container_list = [ @@ -1048,8 +1059,13 @@ class TestObjectExpirer(TestCase): expected) # not empty; not deleted self.assertEqual(mock_delete_container.mock_calls, []) + self.assertEqual( + {'tasks.assigned': 5}, + self.expirer.logger.statsd_client.get_increment_counts() + ) # the task queue has invalid task object + self.expirer.logger.statsd_client.clear() invalid_aco_dict = deepcopy(self.fake_swift.aco_dict) invalid_aco_dict['.expiring_objects'][self.past_time_container].insert( 0, self.past_time + '-invalid0') @@ -1064,8 +1080,13 @@ class TestObjectExpirer(TestCase): list(x.iter_task_to_expire( task_account_container_list, my_index, divisor)), expected) + self.assertEqual( + {'tasks.assigned': 5, 'tasks.parse_errors': 2}, + self.expirer.logger.statsd_client.get_increment_counts() + ) # test some of that async delete + self.expirer.logger.statsd_client.clear() async_delete_aco_dict = { '.expiring_objects': { # this task container will be checked @@ -1112,6 +1133,104 @@ class TestObjectExpirer(TestCase): task_account_container_list, my_index, divisor)) self.assertEqual(expected, found) + self.assertEqual( + {'tasks.assigned': 10}, + self.expirer.logger.statsd_client.get_increment_counts() + ) + + def test_iter_task_to_expire_with_skipped_tasks_single_process(self): + # Only one task is assigned to the tested expirer + my_index = 0 + divisor = 10 + task_account_container_list = [ + (".expiring_objects", self.past_time_container) + ] + + expected = [ + self.make_task( + self.past_time_container, + self.past_time, + self.expired_target_paths[self.past_time][0], + ) + ] + # Use mock of hash_mod to output predictable result. + with mock.patch.object(self.expirer, "hash_mod", + side_effect=itertools.cycle(range(10))): + self.assertEqual( + expected, + list( + self.expirer.iter_task_to_expire( + task_account_container_list, my_index, divisor + ) + ) + ) + self.assertEqual( + {"tasks.assigned": 1, "tasks.skipped": 4}, + self.expirer.logger.statsd_client.get_increment_counts() + ) + + def test_iter_task_to_expire_with_skipped_tasks_multi_processes(self): + processes = 10 + task_account_container_list = [ + (".expiring_objects", self.past_time_container), + (".expiring_objects", self.just_past_time_container), + (".expiring_objects", self.future_time_container), + ] + + total_tasks = 0 + for i in range(processes): + yielded_tasks = list( + self.expirer.iter_task_to_expire( + task_account_container_list, i, processes + )) + total_tasks += len(yielded_tasks) + # Ten tasks, each process gets 1 on overage. + # N.B. each process may get 0 or multiple tasks, since hash_mod is + # based on names of current time. + self.assertEqual(10, total_tasks) + + # On overage, each process was assigned 1 task and skipped 9 + self.assertEqual({ + 'tasks.assigned': 10, + 'tasks.skipped': 90, + }, self.expirer.logger.statsd_client.get_increment_counts()) + + def test_iter_task_to_expire_with_skipped_and_delayed_tasks(self): + divisor = 3 + task_account_container_list = [ + (".expiring_objects", self.past_time_container), + (".expiring_objects", self.just_past_time_container), + ] + expected_task_paths = [ + path + for path in sorted(self.expired_target_paths[self.past_time] + + self.expired_target_paths[self.just_past_time]) + if not path.startswith('a1') # delayed task + ] + self.assertEqual(9, len(expected_task_paths)) # sanity check + + actual_task_paths = [] + proc_stats = defaultdict(int) + for process in range(divisor): + self.conf['delay_reaping_a1'] = 2 * 86400 + self.conf['process'] = process + self.conf['processes'] = 3 + x = expirer.ObjectExpirer(self.conf, logger=debug_logger(), + swift=self.fake_swift) + actual_task_paths.extend( + sorted([task['target_path'] for task in + x.iter_task_to_expire( + task_account_container_list, process, divisor)])) + for k, v in x.logger.statsd_client.get_increment_counts().items(): + proc_stats[k] += v + + self.assertEqual( + {"tasks.skipped": 20, + "tasks.delayed": 1, + "tasks.assigned": 9}, + proc_stats, + ) + self.assertEqual(expected_task_paths, sorted(actual_task_paths)) def test_iter_task_to_expire_with_delay_reaping(self): aco_dict = { @@ -1169,8 +1288,13 @@ class TestObjectExpirer(TestCase): observed = list(x.iter_task_to_expire( task_account_container_list, 0, 1)) self.assertEqual(expected, observed) + self.assertEqual( + {'tasks.assigned': 6}, + self.expirer.logger.statsd_client.get_increment_counts() + ) # configure delay for account a1 + self.expirer.logger.statsd_client.clear() self.conf['delay_reaping_a1'] = 300.0 x = expirer.ObjectExpirer(self.conf, logger=self.logger, swift=fake_swift) @@ -1197,10 +1321,15 @@ class TestObjectExpirer(TestCase): observed = list(x.iter_task_to_expire( task_account_container_list, 0, 1)) self.assertEqual(expected, observed) + self.assertEqual( + {'tasks.assigned': 4, 'tasks.delayed': 2}, + self.expirer.logger.statsd_client.get_increment_counts() + ) # configure delay for account a1 and for account a1 and container c2 # container a1/c2 expires expires almost immediately # but other containers in account a1 remain (a1/c1 and a1/c3) + self.expirer.logger.statsd_client.clear() self.conf['delay_reaping_a1'] = 300.0 self.conf['delay_reaping_a1/c2'] = 0.1 x = expirer.ObjectExpirer(self.conf, logger=self.logger, @@ -1230,9 +1359,14 @@ class TestObjectExpirer(TestCase): observed = list(x.iter_task_to_expire( task_account_container_list, 0, 1)) self.assertEqual(expected, observed) + self.assertEqual( + {'tasks.assigned': 5, 'tasks.delayed': 1}, + self.expirer.logger.statsd_client.get_increment_counts() + ) # configure delay for account a1 and for account a1 and container c2 # container a1/c2 does not expire but others in account a1 do + self.expirer.logger.statsd_client.clear() self.conf['delay_reaping_a1'] = 0.1 self.conf['delay_reaping_a1/c2'] = 300.0 x = expirer.ObjectExpirer(self.conf, logger=self.logger, @@ -1262,6 +1396,10 @@ class TestObjectExpirer(TestCase): observed = list(x.iter_task_to_expire( task_account_container_list, 0, 1)) self.assertEqual(expected, observed) + self.assertEqual( + {'tasks.assigned': 5, 'tasks.delayed': 1}, + self.expirer.logger.statsd_client.get_increment_counts() + ) def test_iter_task_to_expire_with_delay_reaping_is_async(self): aco_dict = { @@ -1432,6 +1570,10 @@ class TestObjectExpirer(TestCase): lambda a, c, o: None): self.expirer.run_once() self.assertEqual(self.expirer.report_objects, 10) + self.assertEqual( + {'tasks.assigned': 10, 'objects': 10}, + self.expirer.logger.statsd_client.get_increment_counts() + ) def test_delete_actual_object_gets_native_string(self): got_str = [False]