diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 10c050906f..d940c10d41 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -528,6 +528,12 @@ use = egg:swift#backend_ratelimit # Must be an integer value greater than or equal to 0. # max_deferred_updates = 10000 # +# Maximum number of oldest async pending timestamps to track for each +# account-container pair. +# async_tracker_max_entries = 100 +# Maximum number of oldest async pending timestamps to dump to recon cache. +# async_tracker_dump_count = 5 +# # slowdown will sleep that amount between objects. Deprecated; use # objects_per_second instead. # slowdown = 0.01 diff --git a/swift/common/middleware/recon.py b/swift/common/middleware/recon.py index fe4ff941c1..d494e3a144 100644 --- a/swift/common/middleware/recon.py +++ b/swift/common/middleware/recon.py @@ -189,7 +189,8 @@ class ReconMiddleware(object): return self._from_recon_cache(['container_updater_sweep'], self.container_recon_cache) elif recon_type == 'object': - return self._from_recon_cache(['object_updater_sweep'], + return self._from_recon_cache(['object_updater_sweep', + 'object_updater_stats'], self.object_recon_cache) else: return None diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 63e03ecdda..c019c35795 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -22,6 +22,7 @@ import sys import time import uuid from random import random, shuffle +from bisect import insort from collections import deque from eventlet import spawn, Timeout @@ -215,6 +216,107 @@ class BucketizedUpdateSkippingLimiter(object): __next__ = next +class OldestAsyncPendingTracker: + """ + Manages the tracking of the oldest async pending updates for each + account-container pair using a sorted list for timestamps. Evicts the + newest pairs when t max_entries is reached. Supports retrieving the N + oldest async pending updates or calculating the age of the oldest pending + update. + """ + def __init__( + self, + max_entries, + ): + self.max_entries = max_entries + self.sorted_entries = [] + self.ac_to_timestamp = {} + + def add_update(self, account, container, timestamp): + ac = (account, container) + + if ac in self.ac_to_timestamp: + old_timestamp = self.ac_to_timestamp[ac] + # Only replace the existing timestamp if the new one is older + if timestamp < old_timestamp: + # Remove the old (timestamp, ac) from the + # sorted list + self.sorted_entries.remove((old_timestamp, ac)) + # Insert the new (timestamp, ac) in the sorted order + insort(self.sorted_entries, (timestamp, ac)) + # Update the ac_to_timestamp dictionary + self.ac_to_timestamp[ac] = timestamp + else: + # Insert the new (timestamp, ac) in the sorted order + insort(self.sorted_entries, (timestamp, ac)) + self.ac_to_timestamp[ac] = timestamp + + # Check size and evict the newest ac(s) if necessary + if (len(self.ac_to_timestamp) > self.max_entries): + # Pop the newest entry (largest timestamp) + _, newest_ac = (self.sorted_entries.pop()) + del self.ac_to_timestamp[newest_ac] + + def get_n_oldest_timestamp_acs(self, n): + oldest_entries = self.sorted_entries[:n] + return { + 'oldest_count': len(oldest_entries), + 'oldest_entries': [ + { + 'timestamp': entry[0], + 'account': entry[1][0], + 'container': entry[1][1], + } + for entry in oldest_entries + ], + } + + def get_oldest_timestamp(self): + if self.sorted_entries: + return float(self.sorted_entries[0][0]) + return None + + def get_oldest_timestamp_age(self): + current_time = time.time() + oldest_timestamp = self.get_oldest_timestamp() + if oldest_timestamp is not None: + return current_time - oldest_timestamp + return None + + def reset(self): + self.sorted_entries = [] + self.ac_to_timestamp = {} + + def get_memory_usage(self): + return self._get_size(self) + + def _get_size(self, obj, seen=None): + if seen is None: + seen = set() + + obj_id = id(obj) + if obj_id in seen: + return 0 + seen.add(obj_id) + + size = sys.getsizeof(obj) + + if isinstance(obj, dict): + size += sum( + self._get_size(k, seen) + self._get_size(v, seen) + for k, v in obj.items() + ) + elif hasattr(obj, '__dict__'): + size += self._get_size(obj.__dict__, seen) + elif ( + hasattr(obj, '__iter__') + and not isinstance(obj, (str, bytes, bytearray)) + ): + size += sum(self._get_size(i, seen) for i in obj) + + return size + + class SweepStats(object): """ Stats bucket for an update sweep @@ -333,7 +435,14 @@ class ObjectUpdater(Daemon): self.recon_cache_path = conf.get('recon_cache_path', DEFAULT_RECON_CACHE_PATH) self.rcache = os.path.join(self.recon_cache_path, RECON_OBJECT_FILE) + max_entries = config_positive_int_value( + conf.get('async_tracker_max_entries', 100) + ) + self.dump_count = config_positive_int_value( + conf.get('async_tracker_dump_count', 5) + ) self.stats = SweepStats() + self.oldest_async_pendings = OldestAsyncPendingTracker(max_entries) self.max_deferred_updates = non_negative_int( conf.get('max_deferred_updates', 10000)) self.begin = time.time() @@ -384,6 +493,7 @@ class ObjectUpdater(Daemon): os.environ.pop('NOTIFY_SOCKET', None) eventlet_monkey_patch() self.stats.reset() + self.oldest_async_pendings.reset() forkbegin = time.time() self.object_sweep(dev_path) elapsed = time.time() - forkbegin @@ -398,8 +508,7 @@ class ObjectUpdater(Daemon): elapsed = time.time() - self.begin self.logger.info('Object update sweep completed: %.02fs', elapsed) - dump_recon_cache({'object_updater_sweep': elapsed}, - self.rcache, self.logger) + self.dump_recon(elapsed) if elapsed < self.interval: time.sleep(self.interval - elapsed) @@ -408,6 +517,7 @@ class ObjectUpdater(Daemon): self.logger.info('Begin object update single threaded sweep') self.begin = time.time() self.stats.reset() + self.oldest_async_pendings.reset() for device in self._listdir(self.devices): try: dev_path = check_drive(self.devices, device, self.mount_check) @@ -423,8 +533,37 @@ class ObjectUpdater(Daemon): ('Object update single-threaded sweep completed: ' '%(elapsed).02fs, %(stats)s'), {'elapsed': elapsed, 'stats': self.stats}) - dump_recon_cache({'object_updater_sweep': elapsed}, - self.rcache, self.logger) + self.dump_recon(elapsed) + + def dump_recon(self, elapsed): + """Gathers stats and dumps recon cache.""" + object_updater_stats = { + 'failures_oldest_timestamp': ( + self.oldest_async_pendings.get_oldest_timestamp() + ), + 'failures_oldest_timestamp_age': ( + self.oldest_async_pendings.get_oldest_timestamp_age() + ), + 'failures_account_container_count': ( + len(self.oldest_async_pendings.ac_to_timestamp) + ), + 'failures_oldest_timestamp_account_containers': ( + self.oldest_async_pendings.get_n_oldest_timestamp_acs( + self.dump_count + ) + ), + 'tracker_memory_usage': ( + self.oldest_async_pendings.get_memory_usage() + ), + } + dump_recon_cache( + { + 'object_updater_sweep': elapsed, + 'object_updater_stats': object_updater_stats, + }, + self.rcache, + self.logger, + ) def _load_update(self, device, update_path): try: @@ -683,6 +822,9 @@ class ObjectUpdater(Daemon): self.logger.increment('failures') self.logger.debug('Update failed for %(path)s %(update_path)s', {'path': path, 'update_path': update_path}) + self.oldest_async_pendings.add_update( + acct, cont, kwargs['timestamp'] + ) if new_successes: update['successes'] = successes rewrite_pickle = True diff --git a/test/probe/test_object_async_update.py b/test/probe/test_object_async_update.py index 00aefcccbf..1e87c4d4f7 100644 --- a/test/probe/test_object_async_update.py +++ b/test/probe/test_object_async_update.py @@ -17,6 +17,7 @@ from io import BytesIO from unittest import main, SkipTest from uuid import uuid4 +import random from swiftclient import client from swiftclient.exceptions import ClientException @@ -333,5 +334,83 @@ class TestUpdateOverridesEC(ECProbeTest): self.assertEqual('test/ctype', listing[0]['content_type']) +class TestObjectUpdaterStats(ReplProbeTest): + + def setUp(self): + super(TestObjectUpdaterStats, self).setUp() + self.int_client = self.make_internal_client() + self.container_servers = Manager(['container-server']) + + def test_lots_of_asyncs(self): + # Create some (acct, cont) pairs + num_accounts = 3 + num_conts_per_a = 4 + ac_pairs = [] + for a in range(num_accounts): + acct = 'AUTH_user%03d' % a + self.int_client.create_account(acct) + for c in range(num_conts_per_a): + cont = 'cont%03d' % c + self.int_client.create_container(acct, cont) + ac_pairs.append((acct, cont)) + + # Shut down a couple container servers + for n in random.sample([1, 2, 3, 4], 2): + self.container_servers.stop(number=n) + + # Create a bunch of objects + num_objs_per_ac = 5 + for acct, cont in ac_pairs: + for o in range(num_objs_per_ac): + obj = 'obj%03d' % o + self.int_client.upload_object(BytesIO(b''), acct, cont, obj) + + all_asyncs = self.gather_async_pendings() + # Between 1-2 asyncs per object + total_objs = num_objs_per_ac * len(ac_pairs) + self.assertGreater(len(all_asyncs), total_objs) + self.assertLess(len(all_asyncs), total_objs * 2) + + # Run the updater and check stats + Manager(['object-updater']).once() + recons = [] + for onode in self.object_ring.devs: + recon = direct_client.direct_get_recon(onode, 'updater/object') + recons.append(recon) + + self.assertEqual(4, len(recons)) + found_counts = [] + ac_set = set() + for recon in recons: + updater_stats = recon['object_updater_stats'] + + found_counts.append( + updater_stats['failures_account_container_count'] + ) + + oldest_count = updater_stats[ + 'failures_oldest_timestamp_account_containers' + ]['oldest_count'] + self.assertEqual(oldest_count, 5) + + ts_ac_entries = updater_stats[ + 'failures_oldest_timestamp_account_containers' + ]['oldest_entries'] + self.assertEqual(len(ts_ac_entries), oldest_count) + + for entry in ts_ac_entries: + account = entry['account'] + container = entry['container'] + timestamp = entry['timestamp'] + self.assertIsNotNone(timestamp) + ac_set.add((account, container)) + + for ac in ac_set: + self.assertIn(ac, set(ac_pairs)) + + for found_count in found_counts: + self.assertLessEqual(found_count, len(ac_pairs)) + + if __name__ == '__main__': main() diff --git a/test/unit/common/middleware/test_recon.py b/test/unit/common/middleware/test_recon.py index 4b4656d26f..1757f9fbea 100644 --- a/test/unit/common/middleware/test_recon.py +++ b/test/unit/common/middleware/test_recon.py @@ -841,7 +841,7 @@ class TestReconSuccess(TestCase): self.fakecache.fakeout = from_cache_response rv = self.app.get_updater_info('object') self.assertEqual(self.fakecache.fakeout_calls, - [((['object_updater_sweep'], + [((['object_updater_sweep', 'object_updater_stats'], self._full_recon_path('object')), {})]) self.assertEqual(rv, {"object_updater_sweep": 0.79848217964172363}) diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 0ff89567b7..fcc867b11a 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -140,6 +140,8 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(daemon.max_objects_per_container_per_second, 0.0) self.assertEqual(daemon.per_container_ratelimit_buckets, 1000) self.assertEqual(daemon.max_deferred_updates, 10000) + self.assertEqual(daemon.oldest_async_pendings.max_entries, 100) + self.assertEqual(daemon.dump_count, 5) # non-defaults conf = { @@ -153,6 +155,8 @@ class TestObjectUpdater(unittest.TestCase): 'max_objects_per_container_per_second': '1.2', 'per_container_ratelimit_buckets': '100', 'max_deferred_updates': '0', + 'async_tracker_max_entries': '200', + 'async_tracker_dump_count': '10', } daemon = object_updater.ObjectUpdater(conf, logger=self.logger) self.assertEqual(daemon.devices, '/some/where/else') @@ -165,6 +169,8 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(daemon.max_objects_per_container_per_second, 1.2) self.assertEqual(daemon.per_container_ratelimit_buckets, 100) self.assertEqual(daemon.max_deferred_updates, 0) + self.assertEqual(daemon.oldest_async_pendings.max_entries, 200) + self.assertEqual(daemon.dump_count, 10) # check deprecated option daemon = object_updater.ObjectUpdater({'slowdown': '0.04'}, @@ -189,6 +195,8 @@ class TestObjectUpdater(unittest.TestCase): check_bad({'max_deferred_updates': '-1'}) check_bad({'max_deferred_updates': '1.1'}) check_bad({'max_deferred_updates': 'auto'}) + check_bad({'async_tracker_max_entries': '-10'}) + check_bad({'async_tracker_dump_count': '-5'}) @mock.patch('os.listdir') def test_listdir_with_exception(self, mock_listdir): @@ -2157,6 +2165,146 @@ class TestRateLimiterBucket(unittest.TestCase): self.assertEqual([b2, b1], [buckets.get_nowait() for _ in range(2)]) +class TestOldestAsyncPendingTracker(unittest.TestCase): + def setUp(self): + self.manager = object_updater.OldestAsyncPendingTracker(3) + + def test_add_update_new_pair(self): + self.manager.add_update('a1', 'c1', 1000.0) + self.assertEqual(self.manager.ac_to_timestamp[('a1', 'c1')], 1000.0) + self.assertIn((1000.0, ('a1', 'c1')), self.manager.sorted_entries) + + def test_add_update_existing_pair(self): + self.manager.add_update('a1', 'c1', 1000.0) + self.assertIn((1000.0, ('a1', 'c1')), self.manager.sorted_entries) + self.manager.add_update('a1', 'c1', 900.0) + self.assertEqual(self.manager.ac_to_timestamp[('a1', 'c1')], 900.0) + self.assertNotIn((1000.0, ('a1', 'c1')), self.manager.sorted_entries) + self.assertIn((900.0, ('a1', 'c1')), self.manager.sorted_entries) + self.manager.add_update('a1', 'c1', 1100.0) + self.assertEqual(self.manager.ac_to_timestamp[('a1', 'c1')], 900.0) + self.assertNotIn((1100.0, ('a1', 'c1')), self.manager.sorted_entries) + self.assertIn((900.0, ('a1', 'c1')), self.manager.sorted_entries) + + def test_eviction_when_limit_exceeded(self): + self.manager.add_update('a1', 'c1', 1000.0) + self.manager.add_update('a2', 'c2', 2000.0) + self.manager.add_update('a3', 'c3', 3000.0) + + self.manager.add_update('a4', 'c4', 2500.0) + self.assertIn(('a4', 'c4'), self.manager.ac_to_timestamp) + self.assertNotIn(('a3', 'c3'), self.manager.ac_to_timestamp) + + def test_newest_pairs_not_added_when_limit_exceeded(self): + self.manager.add_update('a1', 'c1', 1000.0) + self.manager.add_update('a2', 'c2', 2000.0) + self.manager.add_update('a3', 'c3', 3000.0) + + self.manager.add_update('a4', 'c4', 4000.0) + self.assertNotIn(('a4', 'c4'), self.manager.ac_to_timestamp) + self.assertIn(('a3', 'c3'), self.manager.ac_to_timestamp) + + def test_get_n_oldest_timestamp_acs(self): + self.manager.add_update('a1', 'c1', 1000.0) + self.manager.add_update('a2', 'c2', 2000.0) + self.manager.add_update('a3', 'c3', 3000.0) + + oldest_pairs = self.manager.get_n_oldest_timestamp_acs(2) + expected_output = { + 'oldest_count': 2, + 'oldest_entries': [ + {'timestamp': 1000.0, 'account': 'a1', 'container': 'c1'}, + {'timestamp': 2000.0, 'account': 'a2', 'container': 'c2'}, + ], + } + self.assertEqual(oldest_pairs, expected_output) + + def test_get_oldest_timestamp(self): + self.manager.add_update('a1', 'c1', 1000.0) + self.manager.add_update('a2', 'c2', 2000.0) + + oldest_timestamp = self.manager.get_oldest_timestamp() + self.assertEqual(oldest_timestamp, 1000.0) + + self.manager.add_update('a3', 'c3', 3000.0) + oldest_timestamp = self.manager.get_oldest_timestamp() + self.assertEqual(oldest_timestamp, 1000.0) + + self.manager.ac_to_timestamp.clear() + self.manager.sorted_entries = [] + oldest_timestamp = self.manager.get_oldest_timestamp() + self.assertEqual(oldest_timestamp, None) + + def test_get_oldest_timestamp_age(self): + current_time = time() + self.manager.add_update('a1', 'c1', current_time - 200.0) + + age = self.manager.get_oldest_timestamp_age() + self.assertAlmostEqual(age, 200.0, delta=1) + + def test_get_oldest_timestamp_age_no_updates(self): + age = self.manager.get_oldest_timestamp_age() + self.assertEqual(age, None) + + def test_eviction_when_multiple_same_timestamps(self): + self.manager.add_update('a1', 'c1', 1000.0) + self.manager.add_update('a2', 'c2', 1000.0) + self.manager.add_update('a3', 'c3', 1000.0) + + self.manager.add_update('a4', 'c4', 500.0) + + expected_present = [('a1', 'c1'), ('a2', 'c2'), ('a4', 'c4')] + expected_absent = [('a3', 'c3')] + + for account_container in expected_present: + self.assertIn(account_container, self.manager.ac_to_timestamp) + + for account_container in expected_absent: + self.assertNotIn(account_container, self.manager.ac_to_timestamp) + + self.assertEqual(len(self.manager.ac_to_timestamp), 3) + + def test_reset(self): + self.manager.add_update('a1', 'c1', 1000.0) + self.manager.add_update('a2', 'c2', 2000.0) + self.manager.reset() + self.assertEqual(len(self.manager.ac_to_timestamp), 0) + self.assertEqual(len(self.manager.sorted_entries), 0) + + def test_memory_usage(self): + self.manager.add_update('a1', 'c1', 1000.0) + memory_usage_before_reset = self.manager.get_memory_usage() + self.assertGreater(memory_usage_before_reset, 0) + + self.manager.reset() + memory_usage_after_reset = self.manager.get_memory_usage() + self.assertLess(memory_usage_after_reset, memory_usage_before_reset) + + def test_no_eviction_when_below_max_entries(self): + self.manager.add_update('a1', 'c1', 1000.0) + self.manager.add_update('a2', 'c2', 2000.0) + + self.assertIn(('a1', 'c1'), self.manager.ac_to_timestamp) + self.assertIn(('a2', 'c2'), self.manager.ac_to_timestamp) + self.assertEqual(len(self.manager.ac_to_timestamp), 2) + + def test_get_n_oldest_timestamp_acs_exceeding_dump_count(self): + self.manager.add_update('a1', 'c1', 1000.0) + self.manager.add_update('a2', 'c2', 2000.0) + self.manager.add_update('a3', 'c3', 3000.0) + + oldest_pairs = self.manager.get_n_oldest_timestamp_acs(5) + expected_output = { + 'oldest_count': 3, + 'oldest_entries': [ + {'timestamp': 1000.0, 'account': 'a1', 'container': 'c1'}, + {'timestamp': 2000.0, 'account': 'a2', 'container': 'c2'}, + {'timestamp': 3000.0, 'account': 'a3', 'container': 'c3'}, + ], + } + self.assertEqual(oldest_pairs, expected_output) + + class TestSweepStats(unittest.TestCase): def test_copy(self): num_props = len(vars(object_updater.SweepStats()))