Finer grained ratelimit for update

Throw our stream of async_pendings through a hash ring; if the virtual
bucket gets hot just start leaving the updates on the floor and move on.

It's off by default; and if you use it you're probably going to leave a
bunch of async updates pointed at a small set of containers in the queue
for the next sweep every sweep (so maybe turn it off at some point)

Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Change-Id: Idef25cd6026b02c1b5c10a9816c8c6cbe505e7ed
This commit is contained in:
Clay Gerrard 2021-12-07 17:19:54 -06:00 committed by Tim Burke
parent 1859f2e161
commit de88862981
3 changed files with 398 additions and 63 deletions

View File

@ -473,6 +473,16 @@ use = egg:swift#recon
# Send at most this many object updates per second
# objects_per_second = 50
#
# Send at most this many object updates per bucket per second. The value must
# be a float greater than or equal to 0. Set to 0 for unlimited.
# max_objects_per_container_per_second = 0
#
# The per_container ratelimit implementation uses a hashring to constrain
# memory requirements. Orders of magnitude more buckets will use (nominally)
# more memory, but will ratelimit smaller groups of containers. The value must
# be an integer greater than 0.
# per_container_ratelimit_buckets = 1000
#
# slowdown will sleep that amount between objects. Deprecated; use
# objects_per_second instead.
# slowdown = 0.01

View File

@ -19,6 +19,7 @@ import os
import signal
import sys
import time
import uuid
from random import random, shuffle
from eventlet import spawn, Timeout
@ -29,7 +30,8 @@ from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data, ContextPool
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
non_negative_float, config_positive_int_value
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@ -39,18 +41,68 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY
class BucketizedUpdateSkippingLimiter(object):
"""
Wrap an iterator to filter elements that show up too often.
:param update_iterable: an async_pending update iterable
:param num_buckets: number of buckets to divide container hashes into, the
more buckets total the less containers to a bucket
(once a busy container slows down a bucket the whole
bucket starts skipping)
:param max_elements_per_group_per_second: tunable, when skipping kicks in
:param skip_f: function to call with update_ctx when skipping it
"""
def __init__(self, update_iterable, num_buckets,
max_elements_per_group_per_second,
skip_f=lambda update_ctx: None):
self.iterator = iter(update_iterable)
# if we want a smaller "blast radius" we could make this number bigger
self.num_buckets = max(num_buckets, 1)
# an array might be more efficient; but this is pretty cheap
self.next_update = [0.0 for _ in range(self.num_buckets)]
try:
self.bucket_update_delta = 1.0 / max_elements_per_group_per_second
except ZeroDivisionError:
self.bucket_update_delta = -1
self.skip_f = skip_f
self.salt = str(uuid.uuid4())
def __iter__(self):
return self
def _bucket_key(self, update):
acct, cont = split_update_path(update)
return int(hash_path(acct, cont, self.salt), 16) % self.num_buckets
def next(self):
for update_ctx in self.iterator:
bucket_key = self._bucket_key(update_ctx['update'])
now = time.time()
if self.next_update[bucket_key] > now:
self.skip_f(update_ctx)
continue
self.next_update[bucket_key] = now + self.bucket_update_delta
return update_ctx
raise StopIteration()
__next__ = next
class SweepStats(object):
"""
Stats bucket for an update sweep
"""
def __init__(self, errors=0, failures=0, quarantines=0, successes=0,
unlinks=0, redirects=0):
unlinks=0, redirects=0, skips=0):
self.errors = errors
self.failures = failures
self.quarantines = quarantines
self.successes = successes
self.unlinks = unlinks
self.redirects = redirects
self.skips = skips
def copy(self):
return type(self)(self.errors, self.failures, self.quarantines,
@ -62,7 +114,8 @@ class SweepStats(object):
self.quarantines - other.quarantines,
self.successes - other.successes,
self.unlinks - other.unlinks,
self.redirects - other.redirects)
self.redirects - other.redirects,
self.skips - other.skips)
def reset(self):
self.errors = 0
@ -71,6 +124,7 @@ class SweepStats(object):
self.successes = 0
self.unlinks = 0
self.redirects = 0
self.skips = 0
def __str__(self):
keys = (
@ -80,10 +134,26 @@ class SweepStats(object):
(self.unlinks, 'unlinks'),
(self.errors, 'errors'),
(self.redirects, 'redirects'),
(self.skips, 'skips'),
)
return ', '.join('%d %s' % pair for pair in keys)
def split_update_path(update):
"""
Split the account and container parts out of the async update data.
N.B. updates to shards set the container_path key while the account and
container keys are always the root.
"""
container_path = update.get('container_path')
if container_path:
acct, cont = split_path('/' + container_path, minsegs=2)
else:
acct, cont = update['account'], update['container']
return acct, cont
class ObjectUpdater(Daemon):
"""Update object information in container listings."""
@ -110,6 +180,10 @@ class ObjectUpdater(Daemon):
self.max_objects_per_second = \
float(conf.get('objects_per_second',
objects_per_second))
self.max_objects_per_container_per_second = non_negative_float(
conf.get('max_objects_per_container_per_second', 0))
self.per_container_ratelimit_buckets = config_positive_int_value(
conf.get('per_container_ratelimit_buckets', 1000))
self.node_timeout = float(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.report_interval = float(conf.get('report_interval', 300))
@ -205,13 +279,40 @@ class ObjectUpdater(Daemon):
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
def _load_update(self, device, update_path):
try:
return pickle.load(open(update_path, 'rb'))
except Exception as e:
if getattr(e, 'errno', None) == errno.ENOENT:
return
self.logger.exception(
'ERROR Pickle problem, quarantining %s', update_path)
self.stats.quarantines += 1
self.logger.increment('quarantines')
target_path = os.path.join(device, 'quarantined', 'objects',
os.path.basename(update_path))
renamer(update_path, target_path, fsync=False)
try:
# If this was the last async_pending in the directory,
# then this will succeed. Otherwise, it'll fail, and
# that's okay.
os.rmdir(os.path.dirname(update_path))
except OSError:
pass
return
def _iter_async_pendings(self, device):
"""
Locate and yield all the async pendings on the device. Multiple updates
for the same object will come out in reverse-chronological order
(i.e. newest first) so that callers can skip stale async_pendings.
Locate and yield an update context for all the async pending files on
the device. Each update context contains details of the async pending
file location, its timestamp and the un-pickled update data.
Tries to clean up empty directories as it goes.
Async pending files that fail to load will be quarantined.
Only the most recent update for the same object is yielded; older
(stale) async pending files are unlinked as they are located.
The iterator tries to clean up empty directories as it goes.
"""
# loop through async pending dirs for all policies
for asyncdir in self._listdir(device):
@ -238,12 +339,13 @@ class ObjectUpdater(Daemon):
if not os.path.isdir(prefix_path):
continue
last_obj_hash = None
for update in sorted(self._listdir(prefix_path), reverse=True):
update_path = os.path.join(prefix_path, update)
for update_file in sorted(self._listdir(prefix_path),
reverse=True):
update_path = os.path.join(prefix_path, update_file)
if not os.path.isfile(update_path):
continue
try:
obj_hash, timestamp = update.split('-')
obj_hash, timestamp = update_file.split('-')
except ValueError:
self.stats.errors += 1
self.logger.increment('errors')
@ -280,9 +382,14 @@ class ObjectUpdater(Daemon):
raise
else:
last_obj_hash = obj_hash
yield {'device': device, 'policy': policy,
'path': update_path,
'obj_hash': obj_hash, 'timestamp': timestamp}
update = self._load_update(device, update_path)
if update is not None:
yield {'device': device,
'policy': policy,
'update_path': update_path,
'obj_hash': obj_hash,
'timestamp': timestamp,
'update': update}
def object_sweep(self, device):
"""
@ -297,13 +404,21 @@ class ObjectUpdater(Daemon):
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
def skip_counting_f(update_ctx):
# in the future we could defer update_ctx
self.stats.skips += 1
self.logger.increment("skips")
ap_iter = RateLimitedIterator(
self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second)
ap_iter = BucketizedUpdateSkippingLimiter(
ap_iter, self.per_container_ratelimit_buckets,
self.max_objects_per_container_per_second,
skip_f=skip_counting_f)
with ContextPool(self.concurrency) as pool:
for update in ap_iter:
pool.spawn(self.process_object_update,
update['path'], update['device'], update['policy'])
for update_ctx in ap_iter:
pool.spawn(self.process_object_update, **update_ctx)
now = time.time()
if now - last_status_update >= self.report_interval:
this_sweep = self.stats.since(start_stats)
@ -326,6 +441,7 @@ class ObjectUpdater(Daemon):
'%(quarantines)d quarantines, '
'%(unlinks)d unlinks, %(errors)d errors, '
'%(redirects)d redirects '
'%(skips)d skips '
'(pid: %(pid)d)'),
{'device': device,
'elapsed': time.time() - start_time,
@ -335,36 +451,20 @@ class ObjectUpdater(Daemon):
'quarantines': sweep_totals.quarantines,
'unlinks': sweep_totals.unlinks,
'errors': sweep_totals.errors,
'redirects': sweep_totals.redirects})
'redirects': sweep_totals.redirects,
'skips': sweep_totals.skips})
def process_object_update(self, update_path, device, policy):
def process_object_update(self, update_path, device, policy, update,
**kwargs):
"""
Process the object information to be updated and update.
:param update_path: path to pickled object update file
:param device: path to device
:param policy: storage policy of object update
:param update: the un-pickled update data
:param kwargs: un-used keys from update_ctx
"""
try:
update = pickle.load(open(update_path, 'rb'))
except Exception as e:
if getattr(e, 'errno', None) == errno.ENOENT:
return
self.logger.exception(
'ERROR Pickle problem, quarantining %s', update_path)
self.stats.quarantines += 1
self.logger.increment('quarantines')
target_path = os.path.join(device, 'quarantined', 'objects',
os.path.basename(update_path))
renamer(update_path, target_path, fsync=False)
try:
# If this was the last async_pending in the directory,
# then this will succeed. Otherwise, it'll fail, and
# that's okay.
os.rmdir(os.path.dirname(update_path))
except OSError:
pass
return
def do_update():
successes = update.get('successes', [])
@ -374,11 +474,7 @@ class ObjectUpdater(Daemon):
str(int(policy)))
headers_out.setdefault('X-Backend-Accept-Redirect', 'true')
headers_out.setdefault('X-Backend-Accept-Quoted-Location', 'true')
container_path = update.get('container_path')
if container_path:
acct, cont = split_path('/' + container_path, minsegs=2)
else:
acct, cont = update['account'], update['container']
acct, cont = split_update_path(update)
part, nodes = self.get_container_ring().get_nodes(acct, cont)
obj = '/%s/%s/%s' % (acct, cont, update['obj'])
events = [spawn(self.object_update,

View File

@ -12,13 +12,14 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import six.moves.cPickle as pickle
import mock
import os
import unittest
import random
import itertools
from collections import Counter
from contextlib import closing
from gzip import GzipFile
from tempfile import mkdtemp
@ -39,8 +40,7 @@ from swift.common.ring import RingData
from swift.common import utils
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.swob import bytes_to_wsgi
from swift.common.utils import (
hash_path, normalize_timestamp, mkdirs, write_pickle)
from swift.common.utils import hash_path, normalize_timestamp, mkdirs
from swift.common.storage_policy import StoragePolicy, POLICIES
@ -125,6 +125,8 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.concurrency, 8)
self.assertEqual(daemon.updater_workers, 1)
self.assertEqual(daemon.max_objects_per_second, 50.0)
self.assertEqual(daemon.max_objects_per_container_per_second, 0.0)
self.assertEqual(daemon.per_container_ratelimit_buckets, 1000)
# non-defaults
conf = {
@ -135,6 +137,8 @@ class TestObjectUpdater(unittest.TestCase):
'concurrency': '2',
'updater_workers': '3',
'objects_per_second': '10.5',
'max_objects_per_container_per_second': '1.2',
'per_container_ratelimit_buckets': '100',
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.assertEqual(daemon.devices, '/some/where/else')
@ -144,6 +148,8 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.concurrency, 2)
self.assertEqual(daemon.updater_workers, 3)
self.assertEqual(daemon.max_objects_per_second, 10.5)
self.assertEqual(daemon.max_objects_per_container_per_second, 1.2)
self.assertEqual(daemon.per_container_ratelimit_buckets, 100)
# check deprecated option
daemon = object_updater.ObjectUpdater({'slowdown': '0.04'},
@ -159,6 +165,12 @@ class TestObjectUpdater(unittest.TestCase):
check_bad({'concurrency': '1.0'})
check_bad({'slowdown': 'baz'})
check_bad({'objects_per_second': 'quux'})
check_bad({'max_objects_per_container_per_second': '-0.1'})
check_bad({'max_objects_per_container_per_second': 'auto'})
check_bad({'per_container_ratelimit_buckets': '1.2'})
check_bad({'per_container_ratelimit_buckets': '0'})
check_bad({'per_container_ratelimit_buckets': '-1'})
check_bad({'per_container_ratelimit_buckets': 'auto'})
@mock.patch('os.listdir')
def test_listdir_with_exception(self, mock_listdir):
@ -191,11 +203,12 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(len(log_lines), 0)
self.assertEqual(path, ['foo', 'bar'])
def test_object_sweep(self):
def check_with_idx(index, warn, should_skip):
if int(index) > 0:
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_object_sweep(self, mock_recon):
def check_with_idx(policy_index, warn, should_skip):
if int(policy_index) > 0:
asyncdir = os.path.join(self.sda1,
ASYNCDIR_BASE + "-" + index)
ASYNCDIR_BASE + "-" + policy_index)
else:
asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE)
@ -210,7 +223,8 @@ class TestObjectUpdater(unittest.TestCase):
os.path.join(self.sda1,
ASYNCDIR_BASE + '-' + 'twentington'),
os.path.join(self.sda1,
ASYNCDIR_BASE + '-' + str(int(index) + 100)))
ASYNCDIR_BASE + '-' + str(
int(policy_index) + 100)))
for not_dir in not_dirs:
with open(not_dir, 'w'):
@ -229,13 +243,13 @@ class TestObjectUpdater(unittest.TestCase):
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
if t == timestamps[0]:
expected.add((o_path, int(index)))
write_pickle({}, o_path)
expected.add((o_path, int(policy_index)))
self._write_dummy_pickle(o_path, 'account', 'container', o)
seen = set()
class MockObjectUpdater(object_updater.ObjectUpdater):
def process_object_update(self, update_path, device, policy):
def process_object_update(self, update_path, policy, **kwargs):
seen.add((update_path, int(policy)))
os.unlink(update_path)
@ -280,10 +294,10 @@ class TestObjectUpdater(unittest.TestCase):
ohash = hash_path('account', 'container', o)
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
write_pickle({}, o_path)
self._write_dummy_pickle(o_path, 'account', 'container', o)
class MockObjectUpdater(object_updater.ObjectUpdater):
def process_object_update(self, update_path, device, policy):
def process_object_update(self, update_path, **kwargs):
os.unlink(update_path)
self.stats.successes += 1
self.stats.unlinks += 1
@ -302,12 +316,13 @@ class TestObjectUpdater(unittest.TestCase):
def mock_time_function():
rv = now[0]
now[0] += 5
now[0] += 4
return rv
# With 10s between updates, time() advancing 5s every time we look,
# With 10s between updates, time() advancing 4s every time we look,
# and 5 async_pendings on disk, we should get at least two progress
# lines.
# lines. (time is incremented by 4 each time the update app iter yields
# and each time the elapsed time is sampled)
with mock.patch('swift.obj.updater.time',
mock.MagicMock(time=mock_time_function)), \
mock.patch.object(object_updater, 'ContextPool', MockPool):
@ -350,10 +365,10 @@ class TestObjectUpdater(unittest.TestCase):
ohash = hash_path('account', 'container%d' % policy.idx, o)
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
write_pickle({}, o_path)
self._write_dummy_pickle(o_path, 'account', 'container', o)
class MockObjectUpdater(object_updater.ObjectUpdater):
def process_object_update(self, update_path, device, policy):
def process_object_update(self, update_path, **kwargs):
os.unlink(update_path)
self.stats.successes += 1
self.stats.unlinks += 1
@ -1186,7 +1201,7 @@ class TestObjectUpdater(unittest.TestCase):
def test_obj_update_gone_missing(self):
# if you've got multiple updaters running (say, both a background
# and foreground process), process_object_update may get a file
# and foreground process), _load_update may get a file
# that doesn't exist
policies = list(POLICIES)
random.shuffle(policies)
@ -1208,13 +1223,227 @@ class TestObjectUpdater(unittest.TestCase):
odir,
'%s-%s' % (ohash, next(self.ts_iter).internal))
self.assertEqual(os.listdir(async_dir), [ohash[-3:]])
self.assertFalse(os.listdir(odir))
with mocked_http_conn():
with mock.patch('swift.obj.updater.dump_recon_cache'):
daemon.process_object_update(op_path, self.sda1, policies[0])
daemon._load_update(self.sda1, op_path)
self.assertEqual({}, daemon.logger.get_increment_counts())
self.assertEqual(os.listdir(async_dir), [ohash[-3:]])
self.assertFalse(os.listdir(odir))
def _write_dummy_pickle(self, path, a, c, o, cp=None):
update = {
'op': 'PUT',
'account': a,
'container': c,
'obj': o,
'headers': {'X-Container-Timestamp': normalize_timestamp(0)}
}
if cp:
update['container_path'] = cp
with open(path, 'wb') as async_pending:
pickle.dump(update, async_pending)
def _make_async_pending_pickle(self, a, c, o, cp=None):
ohash = hash_path(a, c, o)
odir = os.path.join(self.async_dir, ohash[-3:])
mkdirs(odir)
path = os.path.join(
odir,
'%s-%s' % (ohash, normalize_timestamp(time())))
self._write_dummy_pickle(path, a, c, o, cp)
def _find_async_pending_files(self):
found_files = []
for root, dirs, files in os.walk(self.async_dir):
found_files.extend(files)
return found_files
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit(self, mock_recon):
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 1,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(self.async_dir)
num_c1_files = 10
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
# make one more in a different container, with a container_path
self._make_async_pending_pickle('a', 'c2', obj_name,
cp='.shards_a/c2_shard')
c2_part, _ = daemon.get_container_ring().get_nodes('.shards_a',
'c2_shard')
expected_total = num_c1_files + 1
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
expected_success = 2
fake_status_codes = [200] * 3 * expected_success
with mocked_http_conn(*fake_status_codes) as fake_conn:
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped,
len(self._find_async_pending_files()))
self.assertEqual(
Counter(
'/'.join(req['path'].split('/')[:5])
for req in fake_conn.requests),
{'/sda1/%s/a/c1' % c1_part: 3,
'/sda1/%s/.shards_a/c2_shard' % c2_part: 3})
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_unlimited(self, mock_recon):
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 0,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(self.async_dir)
num_c1_files = 10
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
# make one more in a different container, with a container_path
self._make_async_pending_pickle('a', 'c2', obj_name,
cp='.shards_a/c2_shard')
c2_part, _ = daemon.get_container_ring().get_nodes('.shards_a',
'c2_shard')
expected_total = num_c1_files + 1
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
fake_status_codes = [200] * 3 * expected_total
with mocked_http_conn(*fake_status_codes):
daemon.run_once()
self.assertEqual(expected_total, daemon.stats.successes)
self.assertEqual(0, daemon.stats.skips)
self.assertEqual([], self._find_async_pending_files())
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_slow_responses(self, mock_recon):
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 10,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(self.async_dir)
# all updates for same container
num_c1_files = 4
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
expected_total = num_c1_files
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
latencies = [.11, 0, .11, 0]
expected_success = 2
fake_status_codes = [200] * 3 * expected_success
def fake_spawn(pool, *args, **kwargs):
# make each update delay the iter being called again
eventlet.sleep(latencies.pop(0))
return args[0](*args[1:], **kwargs)
with mocked_http_conn(*fake_status_codes):
with mock.patch('swift.obj.updater.ContextPool.spawn', fake_spawn):
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped,
len(self._find_async_pending_files()))
class TestObjectUpdaterFunctions(unittest.TestCase):
def test_split_update_path(self):
update = {
'op': 'PUT',
'account': 'a',
'container': 'c',
'obj': 'o',
'headers': {
'X-Container-Timestamp': normalize_timestamp(0),
}
}
actual = object_updater.split_update_path(update)
self.assertEqual(('a', 'c'), actual)
update['container_path'] = None
actual = object_updater.split_update_path(update)
self.assertEqual(('a', 'c'), actual)
update['container_path'] = '.shards_a/c_shard_n'
actual = object_updater.split_update_path(update)
self.assertEqual(('.shards_a', 'c_shard_n'), actual)
class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
def test_init(self):
it = object_updater.BucketizedUpdateSkippingLimiter([3, 1], 1000, 10)
self.assertEqual(1000, it.num_buckets)
self.assertEqual(0.1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator])
# rate of 0 implies unlimited
it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 9, 0)
self.assertEqual(9, it.num_buckets)
self.assertEqual(-1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator])
# num_buckets is collared at 1
it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 0, 1)
self.assertEqual(1, it.num_buckets)
self.assertEqual(1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator])
def test_iteration_unlimited(self):
# verify iteration at unlimited rate
update_ctxs = [
{'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(20)]
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), 9, 0)
self.assertEqual(update_ctxs, [x for x in it])
def test_iteration_ratelimited(self):
# verify iteration at limited rate - single bucket
update_ctxs = [
{'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(2)]
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), 1, 0.1)
self.assertEqual(update_ctxs[:1], [x for x in it])
def test_iteration_ratelimited_with_callback(self):
# verify iteration at limited rate - single bucket
skipped = []
def on_skip(update_ctx):
skipped.append(update_ctx)
update_ctxs = [
{'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(2)]
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), 1, 0.1, skip_f=on_skip)
self.assertEqual(update_ctxs[:1], [x for x in it])
self.assertEqual(update_ctxs[1:], skipped)
if __name__ == '__main__':
unittest.main()