Expirer: add delete task iteration related metrics
Four new metrics have been added into task iteration process: 'tasks.parse_errors': count of errors when parsing the task object 'tasks.skipped': count of task objects skipped because it doesn't belong to this expirer. 'tasks.delayed': count of objects is still within the delay 'tasks.assigned': count of assigned objects to this expirer Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Change-Id: Ib151c498ba325f39570e963e5b7948080ffcd3d6
This commit is contained in:
parent
682c71afe4
commit
b400a1fdb3
@ -351,6 +351,7 @@ class ObjectExpirer(Daemon):
|
||||
except ValueError:
|
||||
self.logger.exception('Unexcepted error handling task %r' %
|
||||
task_object)
|
||||
self.logger.increment('tasks.parse_errors')
|
||||
continue
|
||||
is_async = o.get('content_type') == ASYNC_DELETE_TYPE
|
||||
delay_reaping = self.get_delay_reaping(target_account,
|
||||
@ -360,16 +361,20 @@ class ObjectExpirer(Daemon):
|
||||
# we shouldn't yield ANY more objects that can't reach
|
||||
# the expiration date yet.
|
||||
break
|
||||
if delete_timestamp > Timestamp(time() - delay_reaping) \
|
||||
and not is_async:
|
||||
# we shouldn't yield the object during the delay
|
||||
continue
|
||||
|
||||
# Only one expirer daemon assigned for each task
|
||||
if self.hash_mod('%s/%s' % (task_container, task_object),
|
||||
divisor) != my_index:
|
||||
self.logger.increment('tasks.skipped')
|
||||
continue
|
||||
|
||||
if delete_timestamp > Timestamp(time() - delay_reaping) \
|
||||
and not is_async:
|
||||
# we shouldn't yield the object during the delay
|
||||
self.logger.increment('tasks.delayed')
|
||||
continue
|
||||
|
||||
self.logger.increment('tasks.assigned')
|
||||
yield {'task_account': task_account,
|
||||
'task_container': task_container,
|
||||
'task_object': task_object,
|
||||
|
@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import itertools
|
||||
from time import time
|
||||
from unittest import main, TestCase
|
||||
from test.debug_logger import debug_logger
|
||||
@ -648,6 +649,7 @@ class TestObjectExpirer(TestCase):
|
||||
self.deleted_objects[task_container].add(task_object)
|
||||
|
||||
x = ObjectExpirer(self.conf, swift=self.fake_swift)
|
||||
x.logger = self.logger
|
||||
|
||||
deleted_objects = defaultdict(set)
|
||||
for i in range(3):
|
||||
@ -659,6 +661,11 @@ class TestObjectExpirer(TestCase):
|
||||
self.assertFalse(deleted_objects[task_container] & deleted)
|
||||
deleted_objects[task_container] |= deleted
|
||||
|
||||
self.assertEqual({
|
||||
'tasks.assigned': 10,
|
||||
'tasks.skipped': 20,
|
||||
}, self.logger.statsd_client.get_increment_counts())
|
||||
|
||||
# sort for comparison
|
||||
deleted_objects = {
|
||||
con: sorted(o_set) for con, o_set in deleted_objects.items()}
|
||||
@ -1016,6 +1023,8 @@ class TestObjectExpirer(TestCase):
|
||||
self.assertEqual(mock_delete_container.mock_calls, [
|
||||
mock.call('.expiring_objects', self.empty_time_container,
|
||||
acceptable_statuses=(2, 404, 409))])
|
||||
self.assertEqual(
|
||||
{}, self.expirer.logger.statsd_client.get_increment_counts())
|
||||
|
||||
# 404 (account/container list race) gets deleted inline
|
||||
task_account_container_list = [
|
||||
@ -1030,6 +1039,8 @@ class TestObjectExpirer(TestCase):
|
||||
self.assertEqual(mock_delete_container.mock_calls, [
|
||||
mock.call('.expiring_objects', 'does-not-matter',
|
||||
acceptable_statuses=(2, 404, 409))])
|
||||
self.assertEqual(
|
||||
{}, self.expirer.logger.statsd_client.get_increment_counts())
|
||||
|
||||
# ready containers are processed
|
||||
task_account_container_list = [
|
||||
@ -1048,8 +1059,13 @@ class TestObjectExpirer(TestCase):
|
||||
expected)
|
||||
# not empty; not deleted
|
||||
self.assertEqual(mock_delete_container.mock_calls, [])
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 5},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
# the task queue has invalid task object
|
||||
self.expirer.logger.statsd_client.clear()
|
||||
invalid_aco_dict = deepcopy(self.fake_swift.aco_dict)
|
||||
invalid_aco_dict['.expiring_objects'][self.past_time_container].insert(
|
||||
0, self.past_time + '-invalid0')
|
||||
@ -1064,8 +1080,13 @@ class TestObjectExpirer(TestCase):
|
||||
list(x.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor)),
|
||||
expected)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 5, 'tasks.parse_errors': 2},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
# test some of that async delete
|
||||
self.expirer.logger.statsd_client.clear()
|
||||
async_delete_aco_dict = {
|
||||
'.expiring_objects': {
|
||||
# this task container will be checked
|
||||
@ -1112,6 +1133,104 @@ class TestObjectExpirer(TestCase):
|
||||
task_account_container_list, my_index, divisor))
|
||||
|
||||
self.assertEqual(expected, found)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 10},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
def test_iter_task_to_expire_with_skipped_tasks_single_process(self):
|
||||
# Only one task is assigned to the tested expirer
|
||||
my_index = 0
|
||||
divisor = 10
|
||||
task_account_container_list = [
|
||||
(".expiring_objects", self.past_time_container)
|
||||
]
|
||||
|
||||
expected = [
|
||||
self.make_task(
|
||||
self.past_time_container,
|
||||
self.past_time,
|
||||
self.expired_target_paths[self.past_time][0],
|
||||
)
|
||||
]
|
||||
# Use mock of hash_mod to output predictable result.
|
||||
with mock.patch.object(self.expirer, "hash_mod",
|
||||
side_effect=itertools.cycle(range(10))):
|
||||
self.assertEqual(
|
||||
expected,
|
||||
list(
|
||||
self.expirer.iter_task_to_expire(
|
||||
task_account_container_list, my_index, divisor
|
||||
)
|
||||
)
|
||||
)
|
||||
self.assertEqual(
|
||||
{"tasks.assigned": 1, "tasks.skipped": 4},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
def test_iter_task_to_expire_with_skipped_tasks_multi_processes(self):
|
||||
processes = 10
|
||||
task_account_container_list = [
|
||||
(".expiring_objects", self.past_time_container),
|
||||
(".expiring_objects", self.just_past_time_container),
|
||||
(".expiring_objects", self.future_time_container),
|
||||
]
|
||||
|
||||
total_tasks = 0
|
||||
for i in range(processes):
|
||||
yielded_tasks = list(
|
||||
self.expirer.iter_task_to_expire(
|
||||
task_account_container_list, i, processes
|
||||
))
|
||||
total_tasks += len(yielded_tasks)
|
||||
# Ten tasks, each process gets 1 on overage.
|
||||
# N.B. each process may get 0 or multiple tasks, since hash_mod is
|
||||
# based on names of current time.
|
||||
self.assertEqual(10, total_tasks)
|
||||
|
||||
# On overage, each process was assigned 1 task and skipped 9
|
||||
self.assertEqual({
|
||||
'tasks.assigned': 10,
|
||||
'tasks.skipped': 90,
|
||||
}, self.expirer.logger.statsd_client.get_increment_counts())
|
||||
|
||||
def test_iter_task_to_expire_with_skipped_and_delayed_tasks(self):
|
||||
divisor = 3
|
||||
task_account_container_list = [
|
||||
(".expiring_objects", self.past_time_container),
|
||||
(".expiring_objects", self.just_past_time_container),
|
||||
]
|
||||
expected_task_paths = [
|
||||
path
|
||||
for path in sorted(self.expired_target_paths[self.past_time] +
|
||||
self.expired_target_paths[self.just_past_time])
|
||||
if not path.startswith('a1') # delayed task
|
||||
]
|
||||
self.assertEqual(9, len(expected_task_paths)) # sanity check
|
||||
|
||||
actual_task_paths = []
|
||||
proc_stats = defaultdict(int)
|
||||
for process in range(divisor):
|
||||
self.conf['delay_reaping_a1'] = 2 * 86400
|
||||
self.conf['process'] = process
|
||||
self.conf['processes'] = 3
|
||||
x = expirer.ObjectExpirer(self.conf, logger=debug_logger(),
|
||||
swift=self.fake_swift)
|
||||
actual_task_paths.extend(
|
||||
sorted([task['target_path'] for task in
|
||||
x.iter_task_to_expire(
|
||||
task_account_container_list, process, divisor)]))
|
||||
for k, v in x.logger.statsd_client.get_increment_counts().items():
|
||||
proc_stats[k] += v
|
||||
|
||||
self.assertEqual(
|
||||
{"tasks.skipped": 20,
|
||||
"tasks.delayed": 1,
|
||||
"tasks.assigned": 9},
|
||||
proc_stats,
|
||||
)
|
||||
self.assertEqual(expected_task_paths, sorted(actual_task_paths))
|
||||
|
||||
def test_iter_task_to_expire_with_delay_reaping(self):
|
||||
aco_dict = {
|
||||
@ -1169,8 +1288,13 @@ class TestObjectExpirer(TestCase):
|
||||
observed = list(x.iter_task_to_expire(
|
||||
task_account_container_list, 0, 1))
|
||||
self.assertEqual(expected, observed)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 6},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
# configure delay for account a1
|
||||
self.expirer.logger.statsd_client.clear()
|
||||
self.conf['delay_reaping_a1'] = 300.0
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=fake_swift)
|
||||
@ -1197,10 +1321,15 @@ class TestObjectExpirer(TestCase):
|
||||
observed = list(x.iter_task_to_expire(
|
||||
task_account_container_list, 0, 1))
|
||||
self.assertEqual(expected, observed)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 4, 'tasks.delayed': 2},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
# configure delay for account a1 and for account a1 and container c2
|
||||
# container a1/c2 expires expires almost immediately
|
||||
# but other containers in account a1 remain (a1/c1 and a1/c3)
|
||||
self.expirer.logger.statsd_client.clear()
|
||||
self.conf['delay_reaping_a1'] = 300.0
|
||||
self.conf['delay_reaping_a1/c2'] = 0.1
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
@ -1230,9 +1359,14 @@ class TestObjectExpirer(TestCase):
|
||||
observed = list(x.iter_task_to_expire(
|
||||
task_account_container_list, 0, 1))
|
||||
self.assertEqual(expected, observed)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 5, 'tasks.delayed': 1},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
# configure delay for account a1 and for account a1 and container c2
|
||||
# container a1/c2 does not expire but others in account a1 do
|
||||
self.expirer.logger.statsd_client.clear()
|
||||
self.conf['delay_reaping_a1'] = 0.1
|
||||
self.conf['delay_reaping_a1/c2'] = 300.0
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
@ -1262,6 +1396,10 @@ class TestObjectExpirer(TestCase):
|
||||
observed = list(x.iter_task_to_expire(
|
||||
task_account_container_list, 0, 1))
|
||||
self.assertEqual(expected, observed)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 5, 'tasks.delayed': 1},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
def test_iter_task_to_expire_with_delay_reaping_is_async(self):
|
||||
aco_dict = {
|
||||
@ -1432,6 +1570,10 @@ class TestObjectExpirer(TestCase):
|
||||
lambda a, c, o: None):
|
||||
self.expirer.run_once()
|
||||
self.assertEqual(self.expirer.report_objects, 10)
|
||||
self.assertEqual(
|
||||
{'tasks.assigned': 10, 'objects': 10},
|
||||
self.expirer.logger.statsd_client.get_increment_counts()
|
||||
)
|
||||
|
||||
def test_delete_actual_object_gets_native_string(self):
|
||||
got_str = [False]
|
||||
|
Loading…
Reference in New Issue
Block a user