Merge "Add oldest failed async pending tracker"

This commit is contained in:
Zuul 2024-11-05 08:22:10 +00:00 committed by Gerrit Code Review
commit 7662cde704
6 changed files with 382 additions and 6 deletions

View File

@ -528,6 +528,12 @@ use = egg:swift#backend_ratelimit
# Must be an integer value greater than or equal to 0.
# max_deferred_updates = 10000
#
# Maximum number of oldest async pending timestamps to track for each
# account-container pair.
# async_tracker_max_entries = 100
# Maximum number of oldest async pending timestamps to dump to recon cache.
# async_tracker_dump_count = 5
#
# slowdown will sleep that amount between objects. Deprecated; use
# objects_per_second instead.
# slowdown = 0.01

View File

@ -189,7 +189,8 @@ class ReconMiddleware(object):
return self._from_recon_cache(['container_updater_sweep'],
self.container_recon_cache)
elif recon_type == 'object':
return self._from_recon_cache(['object_updater_sweep'],
return self._from_recon_cache(['object_updater_sweep',
'object_updater_stats'],
self.object_recon_cache)
else:
return None

View File

@ -22,6 +22,7 @@ import sys
import time
import uuid
from random import random, shuffle
from bisect import insort
from collections import deque
from eventlet import spawn, Timeout
@ -215,6 +216,107 @@ class BucketizedUpdateSkippingLimiter(object):
__next__ = next
class OldestAsyncPendingTracker:
"""
Manages the tracking of the oldest async pending updates for each
account-container pair using a sorted list for timestamps. Evicts the
newest pairs when t max_entries is reached. Supports retrieving the N
oldest async pending updates or calculating the age of the oldest pending
update.
"""
def __init__(
self,
max_entries,
):
self.max_entries = max_entries
self.sorted_entries = []
self.ac_to_timestamp = {}
def add_update(self, account, container, timestamp):
ac = (account, container)
if ac in self.ac_to_timestamp:
old_timestamp = self.ac_to_timestamp[ac]
# Only replace the existing timestamp if the new one is older
if timestamp < old_timestamp:
# Remove the old (timestamp, ac) from the
# sorted list
self.sorted_entries.remove((old_timestamp, ac))
# Insert the new (timestamp, ac) in the sorted order
insort(self.sorted_entries, (timestamp, ac))
# Update the ac_to_timestamp dictionary
self.ac_to_timestamp[ac] = timestamp
else:
# Insert the new (timestamp, ac) in the sorted order
insort(self.sorted_entries, (timestamp, ac))
self.ac_to_timestamp[ac] = timestamp
# Check size and evict the newest ac(s) if necessary
if (len(self.ac_to_timestamp) > self.max_entries):
# Pop the newest entry (largest timestamp)
_, newest_ac = (self.sorted_entries.pop())
del self.ac_to_timestamp[newest_ac]
def get_n_oldest_timestamp_acs(self, n):
oldest_entries = self.sorted_entries[:n]
return {
'oldest_count': len(oldest_entries),
'oldest_entries': [
{
'timestamp': entry[0],
'account': entry[1][0],
'container': entry[1][1],
}
for entry in oldest_entries
],
}
def get_oldest_timestamp(self):
if self.sorted_entries:
return float(self.sorted_entries[0][0])
return None
def get_oldest_timestamp_age(self):
current_time = time.time()
oldest_timestamp = self.get_oldest_timestamp()
if oldest_timestamp is not None:
return current_time - oldest_timestamp
return None
def reset(self):
self.sorted_entries = []
self.ac_to_timestamp = {}
def get_memory_usage(self):
return self._get_size(self)
def _get_size(self, obj, seen=None):
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)
if isinstance(obj, dict):
size += sum(
self._get_size(k, seen) + self._get_size(v, seen)
for k, v in obj.items()
)
elif hasattr(obj, '__dict__'):
size += self._get_size(obj.__dict__, seen)
elif (
hasattr(obj, '__iter__')
and not isinstance(obj, (str, bytes, bytearray))
):
size += sum(self._get_size(i, seen) for i in obj)
return size
class SweepStats(object):
"""
Stats bucket for an update sweep
@ -333,7 +435,14 @@ class ObjectUpdater(Daemon):
self.recon_cache_path = conf.get('recon_cache_path',
DEFAULT_RECON_CACHE_PATH)
self.rcache = os.path.join(self.recon_cache_path, RECON_OBJECT_FILE)
max_entries = config_positive_int_value(
conf.get('async_tracker_max_entries', 100)
)
self.dump_count = config_positive_int_value(
conf.get('async_tracker_dump_count', 5)
)
self.stats = SweepStats()
self.oldest_async_pendings = OldestAsyncPendingTracker(max_entries)
self.max_deferred_updates = non_negative_int(
conf.get('max_deferred_updates', 10000))
self.begin = time.time()
@ -384,6 +493,7 @@ class ObjectUpdater(Daemon):
os.environ.pop('NOTIFY_SOCKET', None)
eventlet_monkey_patch()
self.stats.reset()
self.oldest_async_pendings.reset()
forkbegin = time.time()
self.object_sweep(dev_path)
elapsed = time.time() - forkbegin
@ -398,8 +508,7 @@ class ObjectUpdater(Daemon):
elapsed = time.time() - self.begin
self.logger.info('Object update sweep completed: %.02fs',
elapsed)
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
self.dump_recon(elapsed)
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
@ -408,6 +517,7 @@ class ObjectUpdater(Daemon):
self.logger.info('Begin object update single threaded sweep')
self.begin = time.time()
self.stats.reset()
self.oldest_async_pendings.reset()
for device in self._listdir(self.devices):
try:
dev_path = check_drive(self.devices, device, self.mount_check)
@ -423,8 +533,37 @@ class ObjectUpdater(Daemon):
('Object update single-threaded sweep completed: '
'%(elapsed).02fs, %(stats)s'),
{'elapsed': elapsed, 'stats': self.stats})
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
self.dump_recon(elapsed)
def dump_recon(self, elapsed):
"""Gathers stats and dumps recon cache."""
object_updater_stats = {
'failures_oldest_timestamp': (
self.oldest_async_pendings.get_oldest_timestamp()
),
'failures_oldest_timestamp_age': (
self.oldest_async_pendings.get_oldest_timestamp_age()
),
'failures_account_container_count': (
len(self.oldest_async_pendings.ac_to_timestamp)
),
'failures_oldest_timestamp_account_containers': (
self.oldest_async_pendings.get_n_oldest_timestamp_acs(
self.dump_count
)
),
'tracker_memory_usage': (
self.oldest_async_pendings.get_memory_usage()
),
}
dump_recon_cache(
{
'object_updater_sweep': elapsed,
'object_updater_stats': object_updater_stats,
},
self.rcache,
self.logger,
)
def _load_update(self, device, update_path):
try:
@ -683,6 +822,9 @@ class ObjectUpdater(Daemon):
self.logger.increment('failures')
self.logger.debug('Update failed for %(path)s %(update_path)s',
{'path': path, 'update_path': update_path})
self.oldest_async_pendings.add_update(
acct, cont, kwargs['timestamp']
)
if new_successes:
update['successes'] = successes
rewrite_pickle = True

View File

@ -17,6 +17,7 @@
from io import BytesIO
from unittest import main, SkipTest
from uuid import uuid4
import random
from swiftclient import client
from swiftclient.exceptions import ClientException
@ -333,5 +334,83 @@ class TestUpdateOverridesEC(ECProbeTest):
self.assertEqual('test/ctype', listing[0]['content_type'])
class TestObjectUpdaterStats(ReplProbeTest):
def setUp(self):
super(TestObjectUpdaterStats, self).setUp()
self.int_client = self.make_internal_client()
self.container_servers = Manager(['container-server'])
def test_lots_of_asyncs(self):
# Create some (acct, cont) pairs
num_accounts = 3
num_conts_per_a = 4
ac_pairs = []
for a in range(num_accounts):
acct = 'AUTH_user%03d' % a
self.int_client.create_account(acct)
for c in range(num_conts_per_a):
cont = 'cont%03d' % c
self.int_client.create_container(acct, cont)
ac_pairs.append((acct, cont))
# Shut down a couple container servers
for n in random.sample([1, 2, 3, 4], 2):
self.container_servers.stop(number=n)
# Create a bunch of objects
num_objs_per_ac = 5
for acct, cont in ac_pairs:
for o in range(num_objs_per_ac):
obj = 'obj%03d' % o
self.int_client.upload_object(BytesIO(b''), acct, cont, obj)
all_asyncs = self.gather_async_pendings()
# Between 1-2 asyncs per object
total_objs = num_objs_per_ac * len(ac_pairs)
self.assertGreater(len(all_asyncs), total_objs)
self.assertLess(len(all_asyncs), total_objs * 2)
# Run the updater and check stats
Manager(['object-updater']).once()
recons = []
for onode in self.object_ring.devs:
recon = direct_client.direct_get_recon(onode, 'updater/object')
recons.append(recon)
self.assertEqual(4, len(recons))
found_counts = []
ac_set = set()
for recon in recons:
updater_stats = recon['object_updater_stats']
found_counts.append(
updater_stats['failures_account_container_count']
)
oldest_count = updater_stats[
'failures_oldest_timestamp_account_containers'
]['oldest_count']
self.assertEqual(oldest_count, 5)
ts_ac_entries = updater_stats[
'failures_oldest_timestamp_account_containers'
]['oldest_entries']
self.assertEqual(len(ts_ac_entries), oldest_count)
for entry in ts_ac_entries:
account = entry['account']
container = entry['container']
timestamp = entry['timestamp']
self.assertIsNotNone(timestamp)
ac_set.add((account, container))
for ac in ac_set:
self.assertIn(ac, set(ac_pairs))
for found_count in found_counts:
self.assertLessEqual(found_count, len(ac_pairs))
if __name__ == '__main__':
main()

View File

@ -841,7 +841,7 @@ class TestReconSuccess(TestCase):
self.fakecache.fakeout = from_cache_response
rv = self.app.get_updater_info('object')
self.assertEqual(self.fakecache.fakeout_calls,
[((['object_updater_sweep'],
[((['object_updater_sweep', 'object_updater_stats'],
self._full_recon_path('object')), {})])
self.assertEqual(rv, {"object_updater_sweep": 0.79848217964172363})

View File

@ -140,6 +140,8 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.max_objects_per_container_per_second, 0.0)
self.assertEqual(daemon.per_container_ratelimit_buckets, 1000)
self.assertEqual(daemon.max_deferred_updates, 10000)
self.assertEqual(daemon.oldest_async_pendings.max_entries, 100)
self.assertEqual(daemon.dump_count, 5)
# non-defaults
conf = {
@ -153,6 +155,8 @@ class TestObjectUpdater(unittest.TestCase):
'max_objects_per_container_per_second': '1.2',
'per_container_ratelimit_buckets': '100',
'max_deferred_updates': '0',
'async_tracker_max_entries': '200',
'async_tracker_dump_count': '10',
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.assertEqual(daemon.devices, '/some/where/else')
@ -165,6 +169,8 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.max_objects_per_container_per_second, 1.2)
self.assertEqual(daemon.per_container_ratelimit_buckets, 100)
self.assertEqual(daemon.max_deferred_updates, 0)
self.assertEqual(daemon.oldest_async_pendings.max_entries, 200)
self.assertEqual(daemon.dump_count, 10)
# check deprecated option
daemon = object_updater.ObjectUpdater({'slowdown': '0.04'},
@ -189,6 +195,8 @@ class TestObjectUpdater(unittest.TestCase):
check_bad({'max_deferred_updates': '-1'})
check_bad({'max_deferred_updates': '1.1'})
check_bad({'max_deferred_updates': 'auto'})
check_bad({'async_tracker_max_entries': '-10'})
check_bad({'async_tracker_dump_count': '-5'})
@mock.patch('os.listdir')
def test_listdir_with_exception(self, mock_listdir):
@ -2157,6 +2165,146 @@ class TestRateLimiterBucket(unittest.TestCase):
self.assertEqual([b2, b1], [buckets.get_nowait() for _ in range(2)])
class TestOldestAsyncPendingTracker(unittest.TestCase):
def setUp(self):
self.manager = object_updater.OldestAsyncPendingTracker(3)
def test_add_update_new_pair(self):
self.manager.add_update('a1', 'c1', 1000.0)
self.assertEqual(self.manager.ac_to_timestamp[('a1', 'c1')], 1000.0)
self.assertIn((1000.0, ('a1', 'c1')), self.manager.sorted_entries)
def test_add_update_existing_pair(self):
self.manager.add_update('a1', 'c1', 1000.0)
self.assertIn((1000.0, ('a1', 'c1')), self.manager.sorted_entries)
self.manager.add_update('a1', 'c1', 900.0)
self.assertEqual(self.manager.ac_to_timestamp[('a1', 'c1')], 900.0)
self.assertNotIn((1000.0, ('a1', 'c1')), self.manager.sorted_entries)
self.assertIn((900.0, ('a1', 'c1')), self.manager.sorted_entries)
self.manager.add_update('a1', 'c1', 1100.0)
self.assertEqual(self.manager.ac_to_timestamp[('a1', 'c1')], 900.0)
self.assertNotIn((1100.0, ('a1', 'c1')), self.manager.sorted_entries)
self.assertIn((900.0, ('a1', 'c1')), self.manager.sorted_entries)
def test_eviction_when_limit_exceeded(self):
self.manager.add_update('a1', 'c1', 1000.0)
self.manager.add_update('a2', 'c2', 2000.0)
self.manager.add_update('a3', 'c3', 3000.0)
self.manager.add_update('a4', 'c4', 2500.0)
self.assertIn(('a4', 'c4'), self.manager.ac_to_timestamp)
self.assertNotIn(('a3', 'c3'), self.manager.ac_to_timestamp)
def test_newest_pairs_not_added_when_limit_exceeded(self):
self.manager.add_update('a1', 'c1', 1000.0)
self.manager.add_update('a2', 'c2', 2000.0)
self.manager.add_update('a3', 'c3', 3000.0)
self.manager.add_update('a4', 'c4', 4000.0)
self.assertNotIn(('a4', 'c4'), self.manager.ac_to_timestamp)
self.assertIn(('a3', 'c3'), self.manager.ac_to_timestamp)
def test_get_n_oldest_timestamp_acs(self):
self.manager.add_update('a1', 'c1', 1000.0)
self.manager.add_update('a2', 'c2', 2000.0)
self.manager.add_update('a3', 'c3', 3000.0)
oldest_pairs = self.manager.get_n_oldest_timestamp_acs(2)
expected_output = {
'oldest_count': 2,
'oldest_entries': [
{'timestamp': 1000.0, 'account': 'a1', 'container': 'c1'},
{'timestamp': 2000.0, 'account': 'a2', 'container': 'c2'},
],
}
self.assertEqual(oldest_pairs, expected_output)
def test_get_oldest_timestamp(self):
self.manager.add_update('a1', 'c1', 1000.0)
self.manager.add_update('a2', 'c2', 2000.0)
oldest_timestamp = self.manager.get_oldest_timestamp()
self.assertEqual(oldest_timestamp, 1000.0)
self.manager.add_update('a3', 'c3', 3000.0)
oldest_timestamp = self.manager.get_oldest_timestamp()
self.assertEqual(oldest_timestamp, 1000.0)
self.manager.ac_to_timestamp.clear()
self.manager.sorted_entries = []
oldest_timestamp = self.manager.get_oldest_timestamp()
self.assertEqual(oldest_timestamp, None)
def test_get_oldest_timestamp_age(self):
current_time = time()
self.manager.add_update('a1', 'c1', current_time - 200.0)
age = self.manager.get_oldest_timestamp_age()
self.assertAlmostEqual(age, 200.0, delta=1)
def test_get_oldest_timestamp_age_no_updates(self):
age = self.manager.get_oldest_timestamp_age()
self.assertEqual(age, None)
def test_eviction_when_multiple_same_timestamps(self):
self.manager.add_update('a1', 'c1', 1000.0)
self.manager.add_update('a2', 'c2', 1000.0)
self.manager.add_update('a3', 'c3', 1000.0)
self.manager.add_update('a4', 'c4', 500.0)
expected_present = [('a1', 'c1'), ('a2', 'c2'), ('a4', 'c4')]
expected_absent = [('a3', 'c3')]
for account_container in expected_present:
self.assertIn(account_container, self.manager.ac_to_timestamp)
for account_container in expected_absent:
self.assertNotIn(account_container, self.manager.ac_to_timestamp)
self.assertEqual(len(self.manager.ac_to_timestamp), 3)
def test_reset(self):
self.manager.add_update('a1', 'c1', 1000.0)
self.manager.add_update('a2', 'c2', 2000.0)
self.manager.reset()
self.assertEqual(len(self.manager.ac_to_timestamp), 0)
self.assertEqual(len(self.manager.sorted_entries), 0)
def test_memory_usage(self):
self.manager.add_update('a1', 'c1', 1000.0)
memory_usage_before_reset = self.manager.get_memory_usage()
self.assertGreater(memory_usage_before_reset, 0)
self.manager.reset()
memory_usage_after_reset = self.manager.get_memory_usage()
self.assertLess(memory_usage_after_reset, memory_usage_before_reset)
def test_no_eviction_when_below_max_entries(self):
self.manager.add_update('a1', 'c1', 1000.0)
self.manager.add_update('a2', 'c2', 2000.0)
self.assertIn(('a1', 'c1'), self.manager.ac_to_timestamp)
self.assertIn(('a2', 'c2'), self.manager.ac_to_timestamp)
self.assertEqual(len(self.manager.ac_to_timestamp), 2)
def test_get_n_oldest_timestamp_acs_exceeding_dump_count(self):
self.manager.add_update('a1', 'c1', 1000.0)
self.manager.add_update('a2', 'c2', 2000.0)
self.manager.add_update('a3', 'c3', 3000.0)
oldest_pairs = self.manager.get_n_oldest_timestamp_acs(5)
expected_output = {
'oldest_count': 3,
'oldest_entries': [
{'timestamp': 1000.0, 'account': 'a1', 'container': 'c1'},
{'timestamp': 2000.0, 'account': 'a2', 'container': 'c2'},
{'timestamp': 3000.0, 'account': 'a3', 'container': 'c3'},
],
}
self.assertEqual(oldest_pairs, expected_output)
class TestSweepStats(unittest.TestCase):
def test_copy(self):
num_props = len(vars(object_updater.SweepStats()))