From 5227cb702b744bbe9aaecfff002604be45f64a8c Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Wed, 23 Mar 2022 18:26:50 +0000 Subject: [PATCH] Refactor rate-limiting helper into a class Replaces the ratelimit_sleep helper function with an EventletRateLimiter class that encapsulates the rate-limiting state that previously needed to be maintained by the caller of the function. The ratelimit_sleep function is retained but deprecated, and now forwards to the EventletRateLimiter class. The object updater's BucketizedUpdateSkippingLimiter is refactored to take advantage of the new EventletRateLimiter class. The rate limiting algorithm is corrected to make the allowed request rate more uniform: previously pairs of requests would be allowed in rapid succession before the rate limiter would the sleep for the time allowance consumed by those two requests; now the rate limiter will sleep as required after each allowed request. For example, before a max_rate of 1 per second might result in 2 requests being allowed followed by a 2 second sleep. That is corrected to be a sleep of 1 second after each request. Change-Id: Ibcf4dbeb4332dee7e9e233473d4ceaf75a5a85c7 --- swift/common/db_auditor.py | 7 +- swift/common/utils.py | 124 +++++++++++++++++++----- swift/container/updater.py | 8 +- swift/obj/auditor.py | 16 ++- swift/obj/updater.py | 38 ++++---- test/unit/common/middleware/test_slo.py | 8 +- test/unit/common/test_utils.py | 119 +++++++++++++++++++++++ test/unit/obj/test_updater.py | 51 ++++------ 8 files changed, 271 insertions(+), 100 deletions(-) diff --git a/swift/common/db_auditor.py b/swift/common/db_auditor.py index 635fcc6ecd..229cfbdc4a 100644 --- a/swift/common/db_auditor.py +++ b/swift/common/db_auditor.py @@ -22,7 +22,7 @@ from eventlet import Timeout import swift.common.db from swift.common.utils import get_logger, audit_location_generator, \ - config_true_value, dump_recon_cache, ratelimit_sleep + config_true_value, dump_recon_cache, EventletRateLimiter from swift.common.daemon import Daemon from swift.common.exceptions import DatabaseAuditorException from swift.common.recon import DEFAULT_RECON_CACHE_PATH, \ @@ -56,9 +56,9 @@ class DatabaseAuditor(Daemon): self.logging_interval = 3600 # once an hour self.passes = 0 self.failures = 0 - self.running_time = 0 self.max_dbs_per_second = \ float(conf.get('{}s_per_second'.format(self.server_type), 200)) + self.rate_limiter = EventletRateLimiter(self.max_dbs_per_second) swift.common.db.DB_PREALLOCATION = \ config_true_value(conf.get('db_preallocation', 'f')) self.recon_cache_path = conf.get('recon_cache_path', @@ -88,8 +88,7 @@ class DatabaseAuditor(Daemon): reported = time.time() self.passes = 0 self.failures = 0 - self.running_time = ratelimit_sleep( - self.running_time, self.max_dbs_per_second) + self.rate_limiter.wait() return reported def run_forever(self, *args, **kwargs): diff --git a/swift/common/utils.py b/swift/common/utils.py index 06c9ded308..a3770972da 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -1723,7 +1723,7 @@ class RateLimitedIterator(object): self.iterator = iter(iterable) self.elements_per_second = elements_per_second self.limit_after = limit_after - self.running_time = 0 + self.rate_limiter = EventletRateLimiter(elements_per_second) self.ratelimit_if = ratelimit_if def __iter__(self): @@ -1736,8 +1736,7 @@ class RateLimitedIterator(object): if self.limit_after > 0: self.limit_after -= 1 else: - self.running_time = ratelimit_sleep(self.running_time, - self.elements_per_second) + self.rate_limiter.wait() return next_value __next__ = next @@ -3451,6 +3450,91 @@ def audit_location_generator(devices, datadir, suffix='', hook_post_device(os.path.join(devices, device)) +class AbstractRateLimiter(object): + # 1,000 milliseconds = 1 second + clock_accuracy = 1000.0 + + def __init__(self, max_rate, rate_buffer=5, running_time=0): + """ + :param max_rate: The maximum rate per second allowed for the process. + Must be > 0 to engage rate-limiting behavior. + :param rate_buffer: Number of seconds the rate counter can drop and be + allowed to catch up (at a faster than listed rate). A larger number + will result in larger spikes in rate but better average accuracy. + :param running_time: The running time in milliseconds of the next + allowable request. Setting this to any time in the past will cause + the rate limiter to immediately allow requests; setting this to a + future time will cause the rate limiter to deny requests until that + time. + """ + self.max_rate = max_rate + self.rate_buffer_ms = rate_buffer * self.clock_accuracy + self.running_time = running_time + self.time_per_incr = (self.clock_accuracy / self.max_rate + if self.max_rate else 0) + + def _sleep(self, seconds): + # subclasses should override to implement a sleep + raise NotImplementedError + + def is_allowed(self, incr_by=1, now=None, block=False): + """ + Check if the calling process is allowed to proceed according to the + rate limit. + + :param incr_by: How much to increment the counter. Useful if you want + to ratelimit 1024 bytes/sec and have differing sizes + of requests. Must be > 0 to engage rate-limiting + behavior. + :param now: The time in seconds; defaults to time.time() + :param block: if True, the call will sleep until the calling process + is allowed to proceed; otherwise the call returns immediately. + :return: True if the the calling process is allowed to proceed, False + otherwise. + """ + if self.max_rate <= 0 or incr_by <= 0: + return True + + now = now or time.time() + # Convert seconds to milliseconds + now = now * self.clock_accuracy + + # Calculate time per request in milliseconds + time_per_request = self.time_per_incr * float(incr_by) + + # Convert rate_buffer to milliseconds and compare + if now - self.running_time > self.rate_buffer_ms: + self.running_time = now + + if now >= self.running_time: + self.running_time += time_per_request + allowed = True + elif block: + sleep_time = (self.running_time - now) / self.clock_accuracy + # increment running time before sleeping in case the sleep allows + # another thread to inspect the rate limiter state + self.running_time += time_per_request + # Convert diff to a floating point number of seconds and sleep + self._sleep(sleep_time) + allowed = True + else: + allowed = False + + return allowed + + def wait(self, incr_by=1, now=None): + self.is_allowed(incr_by=incr_by, now=now, block=True) + + +class EventletRateLimiter(AbstractRateLimiter): + def __init__(self, max_rate, rate_buffer=5, running_time=0): + super(EventletRateLimiter, self).__init__( + max_rate, rate_buffer, running_time) + + def _sleep(self, seconds): + eventlet.sleep(seconds) + + def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5): """ Will eventlet.sleep() for the appropriate time so that the max_rate @@ -3471,30 +3555,18 @@ def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5): A larger number will result in larger spikes in rate but better average accuracy. Must be > 0 to engage rate-limiting behavior. + :return: The absolute time for the next interval in milliseconds; note + that time could have passed well beyond that point, but the next call + will catch that and skip the sleep. """ - if max_rate <= 0 or incr_by <= 0: - return running_time - - # 1,000 milliseconds = 1 second - clock_accuracy = 1000.0 - - # Convert seconds to milliseconds - now = time.time() * clock_accuracy - - # Calculate time per request in milliseconds - time_per_request = clock_accuracy * (float(incr_by) / max_rate) - - # Convert rate_buffer to milliseconds and compare - if now - running_time > rate_buffer * clock_accuracy: - running_time = now - elif running_time - now > time_per_request: - # Convert diff back to a floating point number of seconds and sleep - eventlet.sleep((running_time - now) / clock_accuracy) - - # Return the absolute time for the next interval in milliseconds; note - # that time could have passed well beyond that point, but the next call - # will catch that and skip the sleep. - return running_time + time_per_request + warnings.warn( + 'ratelimit_sleep() is deprecated; use the ``EventletRateLimiter`` ' + 'class instead.', DeprecationWarning + ) + rate_limit = EventletRateLimiter(max_rate, rate_buffer=rate_buffer, + running_time=running_time) + rate_limit.wait(incr_by=incr_by) + return rate_limit.running_time class ContextPool(GreenPool): diff --git a/swift/container/updater.py b/swift/container/updater.py index a22bf0b716..fc7b60eaff 100644 --- a/swift/container/updater.py +++ b/swift/container/updater.py @@ -31,7 +31,7 @@ from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout, LockTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, config_true_value, \ - dump_recon_cache, majority_size, Timestamp, ratelimit_sleep, \ + dump_recon_cache, majority_size, Timestamp, EventletRateLimiter, \ eventlet_monkey_patch from swift.common.daemon import Daemon from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR @@ -59,10 +59,10 @@ class ContainerUpdater(Daemon): float(conf.get('slowdown', '0.01')) + 0.01) else: containers_per_second = 50 - self.containers_running_time = 0 self.max_containers_per_second = \ float(conf.get('containers_per_second', containers_per_second)) + self.rate_limiter = EventletRateLimiter(self.max_containers_per_second) self.node_timeout = float(conf.get('node_timeout', 3)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.no_changes = 0 @@ -226,9 +226,7 @@ class ContainerUpdater(Daemon): self.logger.exception( "Error processing container %s: %s", dbfile, e) - self.containers_running_time = ratelimit_sleep( - self.containers_running_time, - self.max_containers_per_second) + self.rate_limiter.wait() def process_container(self, dbfile): """ diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index e0d95bb4d9..f9013748a9 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -30,7 +30,7 @@ from swift.common.daemon import Daemon from swift.common.storage_policy import POLICIES from swift.common.utils import ( config_auto_int_value, dump_recon_cache, get_logger, list_from_csv, - listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep, + listdir, load_pkg_resource, parse_prefixed_conf, EventletRateLimiter, readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter) from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH @@ -85,8 +85,10 @@ class AuditorWorker(object): self.auditor_type = 'ZBF' self.log_time = int(conf.get('log_time', 3600)) self.last_logged = 0 - self.files_running_time = 0 - self.bytes_running_time = 0 + self.files_rate_limiter = EventletRateLimiter( + self.max_files_per_second) + self.bytes_rate_limiter = EventletRateLimiter( + self.max_bytes_per_second) self.bytes_processed = 0 self.total_bytes_processed = 0 self.total_files_processed = 0 @@ -146,8 +148,7 @@ class AuditorWorker(object): loop_time = time.time() self.failsafe_object_audit(location) self.logger.timing_since('timing', loop_time) - self.files_running_time = ratelimit_sleep( - self.files_running_time, self.max_files_per_second) + self.files_rate_limiter.wait() self.total_files_processed += 1 now = time.time() if now - self.last_logged >= self.log_time: @@ -266,10 +267,7 @@ class AuditorWorker(object): with closing(reader): for chunk in reader: chunk_len = len(chunk) - self.bytes_running_time = ratelimit_sleep( - self.bytes_running_time, - self.max_bytes_per_second, - incr_by=chunk_len) + self.bytes_rate_limiter.wait(incr_by=chunk_len) self.bytes_processed += chunk_len self.total_bytes_processed += chunk_len for watcher in self.watchers: diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 01c609f13c..2ee7c35fad 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -33,7 +33,8 @@ from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \ - non_negative_float, config_positive_int_value, non_negative_int + non_negative_float, config_positive_int_value, non_negative_int, \ + EventletRateLimiter from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -43,16 +44,17 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \ HTTP_MOVED_PERMANENTLY -class RateLimiterBucket(object): - def __init__(self, update_delta): - self.update_delta = update_delta - self.last_time = 0 +class RateLimiterBucket(EventletRateLimiter): + """ + Extends EventletRateLimiter to also maintain a deque of items that have + been deferred due to rate-limiting, and to provide a comparator for sorting + instanced by readiness. + """ + def __init__(self, max_updates_per_second): + super(RateLimiterBucket, self).__init__(max_updates_per_second, + rate_buffer=0) self.deque = deque() - @property - def wait_until(self): - return self.last_time + self.update_delta - def __len__(self): return len(self.deque) @@ -62,10 +64,10 @@ class RateLimiterBucket(object): __nonzero__ = __bool__ # py2 def __lt__(self, other): - # used to sort buckets by readiness + # used to sort RateLimiterBuckets by readiness if isinstance(other, RateLimiterBucket): - return self.wait_until < other.wait_until - return self.wait_until < other + return self.running_time < other.running_time + return self.running_time < other class BucketizedUpdateSkippingLimiter(object): @@ -124,15 +126,11 @@ class BucketizedUpdateSkippingLimiter(object): self.stats = stats # if we want a smaller "blast radius" we could make this number bigger self.num_buckets = max(num_buckets, 1) - try: - self.bucket_update_delta = 1.0 / max_elements_per_group_per_second - except ZeroDivisionError: - self.bucket_update_delta = -1 self.max_deferred_elements = max_deferred_elements self.deferred_buckets = deque() self.drain_until = drain_until self.salt = str(uuid.uuid4()) - self.buckets = [RateLimiterBucket(self.bucket_update_delta) + self.buckets = [RateLimiterBucket(max_elements_per_group_per_second) for _ in range(self.num_buckets)] self.buckets_ordered_by_readiness = None @@ -151,9 +149,8 @@ class BucketizedUpdateSkippingLimiter(object): for update_ctx in self.iterator: bucket = self.buckets[self._bucket_key(update_ctx['update'])] now = self._get_time() - if now >= bucket.wait_until: + if bucket.is_allowed(now=now): # no need to ratelimit, just return next update - bucket.last_time = now return update_ctx self.stats.deferrals += 1 @@ -194,13 +191,12 @@ class BucketizedUpdateSkippingLimiter(object): bucket = self.buckets_ordered_by_readiness.get_nowait() if now < self.drain_until: # wait for next element to be ready - time.sleep(max(0, bucket.wait_until - now)) + bucket.wait(now=now) # drain the most recently deferred element item = bucket.deque.pop() if bucket: # bucket has more deferred elements, re-insert in queue in # correct chronological position - bucket.last_time = self._get_time() self.buckets_ordered_by_readiness.put(bucket) self.stats.drains += 1 self.logger.increment("drains") diff --git a/test/unit/common/middleware/test_slo.py b/test/unit/common/middleware/test_slo.py index 045778a167..b7b1365e84 100644 --- a/test/unit/common/middleware/test_slo.py +++ b/test/unit/common/middleware/test_slo.py @@ -2423,7 +2423,7 @@ class TestSloGetManifest(SloTestCase): status, headers, body = self.call_slo(req) self.assertEqual(status, '200 OK') # sanity check - self.assertEqual(sleeps, [2.0, 2.0, 2.0, 2.0, 2.0]) + self.assertEqual(sleeps, [1.0] * 11) # give the client the first 4 segments without ratelimiting; we'll # sleep less @@ -2435,7 +2435,7 @@ class TestSloGetManifest(SloTestCase): status, headers, body = self.call_slo(req) self.assertEqual(status, '200 OK') # sanity check - self.assertEqual(sleeps, [2.0, 2.0, 2.0]) + self.assertEqual(sleeps, [1.0] * 7) # ratelimit segments under 35 bytes; this affects a-f del sleeps[:] @@ -2446,7 +2446,7 @@ class TestSloGetManifest(SloTestCase): status, headers, body = self.call_slo(req) self.assertEqual(status, '200 OK') # sanity check - self.assertEqual(sleeps, [2.0, 2.0]) + self.assertEqual(sleeps, [1.0] * 5) # ratelimit segments under 36 bytes; this now affects a-g, netting # us one more sleep than before @@ -2458,7 +2458,7 @@ class TestSloGetManifest(SloTestCase): status, headers, body = self.call_slo(req) self.assertEqual(status, '200 OK') # sanity check - self.assertEqual(sleeps, [2.0, 2.0, 2.0]) + self.assertEqual(sleeps, [1.0] * 6) def test_get_manifest_with_submanifest(self): req = Request.blank( diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index bb9e31eb35..4f134e88f9 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -5832,6 +5832,125 @@ class TestAffinityLocalityPredicate(unittest.TestCase): utils.affinity_locality_predicate, 'r1z1=1') +class TestEventletRateLimiter(unittest.TestCase): + def test_init(self): + rl = utils.EventletRateLimiter(0.1) + self.assertEqual(0.1, rl.max_rate) + self.assertEqual(0.0, rl.running_time) + self.assertEqual(5000, rl.rate_buffer_ms) + + rl = utils.EventletRateLimiter( + 0.2, rate_buffer=2, running_time=1234567.8) + self.assertEqual(0.2, rl.max_rate) + self.assertEqual(1234567.8, rl.running_time) + self.assertEqual(2000, rl.rate_buffer_ms) + + def test_non_blocking(self): + rate_limiter = utils.EventletRateLimiter(0.1, rate_buffer=0) + with patch('time.time',) as mock_time: + with patch('eventlet.sleep') as mock_sleep: + mock_time.return_value = 0 + self.assertTrue(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + self.assertFalse(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + + mock_time.return_value = 9.99 + self.assertFalse(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + mock_time.return_value = 10.0 + self.assertTrue(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + self.assertFalse(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + + rate_limiter = utils.EventletRateLimiter(0.1, rate_buffer=20) + with patch('time.time',) as mock_time: + with patch('eventlet.sleep') as mock_sleep: + mock_time.return_value = 20.0 + self.assertTrue(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + self.assertTrue(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + self.assertTrue(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + self.assertFalse(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + + def _do_test(self, max_rate, running_time, start_time, rate_buffer, + incr_by=1.0): + rate_limiter = utils.EventletRateLimiter( + max_rate, + running_time=1000 * running_time, # msecs + rate_buffer=rate_buffer) + grant_times = [] + current_time = [start_time] + + def mock_time(): + return current_time[0] + + def mock_sleep(duration): + current_time[0] += duration + + with patch('time.time', mock_time): + with patch('eventlet.sleep', mock_sleep): + for i in range(5): + rate_limiter.wait(incr_by=incr_by) + grant_times.append(current_time[0]) + return [round(t, 6) for t in grant_times] + + def test_ratelimit(self): + grant_times = self._do_test(1, 0, 1, 0) + self.assertEqual([1, 2, 3, 4, 5], grant_times) + + grant_times = self._do_test(10, 0, 1, 0) + self.assertEqual([1, 1.1, 1.2, 1.3, 1.4], grant_times) + + grant_times = self._do_test(.1, 0, 1, 0) + self.assertEqual([1, 11, 21, 31, 41], grant_times) + + grant_times = self._do_test(.1, 11, 1, 0) + self.assertEqual([11, 21, 31, 41, 51], grant_times) + + def test_incr_by(self): + grant_times = self._do_test(1, 0, 1, 0, incr_by=2.5) + self.assertEqual([1, 3.5, 6, 8.5, 11], grant_times) + + def test_burst(self): + grant_times = self._do_test(1, 1, 4, 0) + self.assertEqual([4, 5, 6, 7, 8], grant_times) + + grant_times = self._do_test(1, 1, 4, 1) + self.assertEqual([4, 5, 6, 7, 8], grant_times) + + grant_times = self._do_test(1, 1, 4, 2) + self.assertEqual([4, 5, 6, 7, 8], grant_times) + + grant_times = self._do_test(1, 1, 4, 3) + self.assertEqual([4, 4, 4, 4, 5], grant_times) + + grant_times = self._do_test(1, 1, 4, 4) + self.assertEqual([4, 4, 4, 4, 5], grant_times) + + grant_times = self._do_test(1, 1, 3, 3) + self.assertEqual([3, 3, 3, 4, 5], grant_times) + + grant_times = self._do_test(1, 0, 2, 3) + self.assertEqual([2, 2, 2, 3, 4], grant_times) + + grant_times = self._do_test(1, 1, 3, 3) + self.assertEqual([3, 3, 3, 4, 5], grant_times) + + grant_times = self._do_test(1, 0, 3, 3) + self.assertEqual([3, 3, 3, 3, 4], grant_times) + + grant_times = self._do_test(1, 1, 3, 3) + self.assertEqual([3, 3, 3, 4, 5], grant_times) + + grant_times = self._do_test(1, 0, 4, 3) + self.assertEqual([4, 5, 6, 7, 8], grant_times) + + class TestRateLimitedIterator(unittest.TestCase): def run_under_pseudo_time( diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index fa3fde32f5..941ec3dc36 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -1652,7 +1652,7 @@ class TestObjectUpdater(unittest.TestCase): len(self._find_async_pending_files())) # indexes 0, 2 succeed; 1, 3, 4 deferred but 1 is bumped from deferral # queue by 4; 4, 3 are then drained - latencies = [0, 0.05, .051, 0, 0, 0, .11, .01] + latencies = [0, 0.05, .051, 0, 0, 0, .11] expected_success = 4 contexts_fed_in = [] @@ -1693,7 +1693,7 @@ class TestObjectUpdater(unittest.TestCase): fake_object_update), \ mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator), \ - mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: daemon.run_once() self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success @@ -1719,7 +1719,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual([aorder[o] for o in expected_updates_sent], [aorder[o] for o in actual_updates_sent]) - self.assertEqual([0, 0, 0, 0, 0, 1, 1, 1], captured_skips_stats) + self.assertEqual([0, 0, 0, 0, 0, 1, 1], captured_skips_stats) expected_deferrals = [ [], @@ -1729,7 +1729,6 @@ class TestObjectUpdater(unittest.TestCase): [objs_fed_in[1], objs_fed_in[3]], [objs_fed_in[3], objs_fed_in[4]], [objs_fed_in[3]], # note: rightmost element is drained - [objs_fed_in[3]], ] self.assertEqual( expected_deferrals, @@ -1776,7 +1775,7 @@ class TestObjectUpdater(unittest.TestCase): # first pass: 0, 2 and 5 succeed, 1, 3, 4, 6 deferred # last 2 deferred items sent before interval elapses latencies = [0, .05, 0.051, 0, 0, .11, 0, 0, - 0.1, 0, 0.1, 0] # total 0.42 + 0.1, 0.1, 0] # total 0.411 expected_success = 5 contexts_fed_in = [] @@ -1820,7 +1819,7 @@ class TestObjectUpdater(unittest.TestCase): fake_object_update), \ mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator), \ - mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: daemon.run_once() self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success @@ -1840,7 +1839,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(expected_updates_sent, actual_updates_sent) # skips (un-drained deferrals) not reported until end of cycle - self.assertEqual([0] * 12, captured_skips_stats) + self.assertEqual([0] * 10, captured_skips_stats) objs_fed_in = [ctx['update']['obj'] for ctx in contexts_fed_in] expected_deferrals = [ @@ -1856,8 +1855,6 @@ class TestObjectUpdater(unittest.TestCase): # note: rightmost element is drained [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4], objs_fed_in[6]], [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]], - [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]], - [objs_fed_in[1], objs_fed_in[3]], [objs_fed_in[1], objs_fed_in[3]], ] self.assertEqual( @@ -1911,21 +1908,21 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): it = object_updater.BucketizedUpdateSkippingLimiter( [3, 1], self.logger, self.stats, 1000, 10) self.assertEqual(1000, it.num_buckets) - self.assertEqual(0.1, it.bucket_update_delta) + self.assertEqual([10] * 1000, [b.max_rate for b in it.buckets]) self.assertEqual([3, 1], [x for x in it.iterator]) # rate of 0 implies unlimited it = object_updater.BucketizedUpdateSkippingLimiter( iter([3, 1]), self.logger, self.stats, 9, 0) self.assertEqual(9, it.num_buckets) - self.assertEqual(-1, it.bucket_update_delta) + self.assertEqual([0] * 9, [b.max_rate for b in it.buckets]) self.assertEqual([3, 1], [x for x in it.iterator]) # num_buckets is collared at 1 it = object_updater.BucketizedUpdateSkippingLimiter( iter([3, 1]), self.logger, self.stats, 0, 1) self.assertEqual(1, it.num_buckets) - self.assertEqual(1, it.bucket_update_delta) + self.assertEqual([1], [b.max_rate for b in it.buckets]) self.assertEqual([3, 1], [x for x in it.iterator]) def test_iteration_unlimited(self): @@ -1963,7 +1960,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # enough capacity for all deferrals with mock.patch('swift.obj.updater.time.time', side_effect=[now, now, now, now, now, now]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs[:3]), self.logger, self.stats, 1, 10, max_deferred_elements=2, @@ -1982,7 +1979,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # only space for one deferral with mock.patch('swift.obj.updater.time.time', side_effect=[now, now, now, now, now]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs[:3]), self.logger, self.stats, 1, 10, max_deferred_elements=1, @@ -2000,7 +1997,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # only time for one deferral with mock.patch('swift.obj.updater.time.time', side_effect=[now, now, now, now, now + 20, now + 20]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs[:3]), self.logger, self.stats, 1, 10, max_deferred_elements=2, @@ -2019,7 +2016,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): with mock.patch('swift.obj.updater.time.time', side_effect=[now, now, now, now, now, now + 20, now + 20]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs), self.logger, self.stats, 1, 10, max_deferred_elements=2, @@ -2048,7 +2045,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # deferrals stick in both buckets with mock.patch('swift.obj.updater.time.time', side_effect=[next(time_iter) for _ in range(12)]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs_1 + update_ctxs_2), self.logger, self.stats, 4, 10, @@ -2073,7 +2070,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # oldest deferral bumped from one bucket due to max_deferrals == 3 with mock.patch('swift.obj.updater.time.time', side_effect=[next(time_iter) for _ in range(10)]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs_1 + update_ctxs_2), self.logger, self.stats, 4, 10, @@ -2097,7 +2094,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # older deferrals bumped from one bucket due to max_deferrals == 2 with mock.patch('swift.obj.updater.time.time', side_effect=[next(time_iter) for _ in range(10)]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs_1 + update_ctxs_2), self.logger, self.stats, 4, 10, @@ -2119,16 +2116,8 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): class TestRateLimiterBucket(unittest.TestCase): - def test_wait_until(self): - b1 = object_updater.RateLimiterBucket(10) - self.assertEqual(10, b1.wait_until) - b1.last_time = b1.wait_until - self.assertEqual(20, b1.wait_until) - b1.last_time = 12345.678 - self.assertEqual(12355.678, b1.wait_until) - def test_len(self): - b1 = object_updater.RateLimiterBucket(10) + b1 = object_updater.RateLimiterBucket(0.1) b1.deque.append(1) b1.deque.append(2) self.assertEqual(2, len(b1)) @@ -2136,7 +2125,7 @@ class TestRateLimiterBucket(unittest.TestCase): self.assertEqual(1, len(b1)) def test_bool(self): - b1 = object_updater.RateLimiterBucket(10) + b1 = object_updater.RateLimiterBucket(0.1) self.assertFalse(b1) b1.deque.append(1) self.assertTrue(b1) @@ -2148,13 +2137,13 @@ class TestRateLimiterBucket(unittest.TestCase): b1 = object_updater.RateLimiterBucket(10) b2 = object_updater.RateLimiterBucket(10) - b2.last_time = next(time_iter) + b2.running_time = next(time_iter) buckets = PriorityQueue() buckets.put(b1) buckets.put(b2) self.assertEqual([b1, b2], [buckets.get_nowait() for _ in range(2)]) - b1.last_time = next(time_iter) + b1.running_time = next(time_iter) buckets.put(b1) buckets.put(b2) self.assertEqual([b2, b1], [buckets.get_nowait() for _ in range(2)])